[GitHub] flink pull request: [FLINK-2634] [gelly] [WIP] Vertex Centric Tria...

2016-01-18 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1291#issuecomment-172458727
  
Hi @vasia, 

This PR contained a bugfix that could have been validated, but I don't mind 
closing it! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2634] [gelly] [WIP] Vertex Centric Tria...

2016-01-18 Thread andralungu
Github user andralungu closed the pull request at:

https://github.com/apache/flink/pull/1291


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2879] [docs] Fixed broken links on the ...

2015-11-11 Thread andralungu
GitHub user andralungu opened a pull request:

https://github.com/apache/flink/pull/1348

[FLINK-2879] [docs] Fixed broken links on the architecture page

Fixed the links on the general architecture page. They were pointing to 
non-existing htmls.  

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andralungu/flink brokenLinks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1348.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1348


commit 04058f78c3d161f28cd3c90f3c10ad31b495490f
Author: andralungu <lungu.an...@gmail.com>
Date:   2015-11-11T20:00:36Z

[FLINK-2879] [docs] Fixed broken links on the architecture page




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2634] [gelly] [WIP] Vertex Centric Tria...

2015-10-22 Thread andralungu
GitHub user andralungu opened a pull request:

https://github.com/apache/flink/pull/1291

[FLINK-2634] [gelly] [WIP] Vertex Centric Triangle Count

This PR builds on the code presented in #1105. 
Basically, the reduceOn* calls are replaced with groupReduceOn* calls. 

As discussed back then, I made the lib method accept any kind of keys. 
While doing so, I found a bit of a bug (which is why I marked this as WIP). The 
groupReduceOnNeighbors function has a version that takes a type argument. The 
problem with that is: coGroup tries to build a type before the call to 
`returns()` which means that it cannot infer the type from the info it has. 

I'll explain this a bit better in a Jira.  

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andralungu/flink trianglecount-vertexcentric

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1291.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1291


commit 1fbc09ca35e80f61f31cf7a6166f27f2a6f142c1
Author: andralungu <lungu.an...@gmail.com>
Date:   2015-10-22T06:18:43Z

[FLINK-2634] [gelly] Vertex Centric Triangle Count

[FLINK-2634] [gelly] Fixed Type Erasure




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...

2015-09-23 Thread andralungu
Github user andralungu closed the pull request at:

https://github.com/apache/flink/pull/1105


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520] [gelly] Create a Graph from CSV f...

2015-09-21 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/1149#discussion_r39947900
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 ---
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with 
optional vertex and edge data.
+ * The class also configures the CSV readers used to read edge and vertex 
data such as the field types,
+ * the delimiters (row and field), the fields that should be included or 
skipped, and other flags,
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in the {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+
+public class GraphCsvReader {
+
+   @SuppressWarnings("unused")
+   private final Path vertexPath, edgePath;
+   private final ExecutionEnvironment executionContext;
+   protected CsvReader edgeReader;
+   protected CsvReader vertexReader;
+   protected MapFunction mapper;
+   protected Class vertexKey;
+   protected Class vertexValue;
+   protected Class edgeValue;
+

+//
+   public GraphCsvReader(Path vertexPath, Path edgePath, 
ExecutionEnvironment context) {
+   this.vertexPath = vertexPath;
+   this.edgePath = edgePath;
+   this.vertexReader = new CsvReader(vertexPath, context);
+   this.edgeReader = new CsvReader(edgePath, context);
+   this.mapper = null;
+   this.executionContext = context;
+   }
+
+   public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
+   this.vertexPath = null;
+   this.edgePath = edgePath;
+   this.edgeReader = new CsvReader(edgePath, context);
+   this.vertexReader = null;
+   this.mapper = null;
+   this.executionContext = context;
+   }
+
+   public <K, VV> GraphCsvReader(Path edgePath, final MapFunction<K, VV> 
mapper, ExecutionEnvironment context) {
+   this.vertexPath = null;
+   this.edgePath = edgePath;
+   this.edgeReader = new CsvReader(edgePath, context);
+   this.vertexReader = null;
+   this.mapper = mapper;
+   this.executionContext = context;
+   }
+
+   public GraphCsvReader (String edgePath, ExecutionEnvironment context) {
+   this(new Path(Preconditions.checkNotNull(edgePath, "The file 
path may not be null.")), context);
+
+   }
+
+   public GraphCsvReader(String vertexPath, String edgePath, 
ExecutionEnvironment context) {
+   this(new Path(Preconditions.checkNotNull(vertexPath, "The file 
path may not be null.")),
+   new Path(Preconditions.checkNotNull(edgePath, 
"The file path may not be null.")), context);
+   }
+
+
+   public <K, VV> GraphCsvReader(String edgePath, final MapFunction<K, VV> 
mapper, ExecutionEnvironment context) {
+   this(new Path(Preconditions.checkNotNull(edgePath, "The 
file path may not be null.")), mapper, context);
+   }
+
+   /**
+* Creates a Graph from CSV input with vertex values and edge values.
+*

[GitHub] flink pull request: [FLINK-1520] [gelly] Create a Graph from CSV f...

2015-09-21 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/1149#discussion_r39948297
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
 ---
@@ -110,24 +105,20 @@ public static void main(String [] args) throws 
Exception {
// Emit results
if(fileOutput) {
resultedVertices.writeAsCsv(outputPath, "\n", 
",");
-
-   // since file sinks are lazy, we trigger the 
execution explicitly
-   env.execute("Incremental SSSP Example");
} else {
resultedVertices.print();
}
 
+   env.execute("Incremental SSSP Example");
--- End diff --

I'm not sure whether I am missing something... Why do you add 
`env.execute()` after `print()`. 
It's no longer needed. Have a look here:

https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520] [gelly] Create a Graph from CSV f...

2015-09-21 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1149#issuecomment-141905104
  
Hi @vasia,

As you said, I already reviewed this :P. I left a couple of comments 
inline. Please reverify the forwarded fields annotations. If you put them there 
for one mapper, add them for the others too. 

Appart from that, it's good to merge.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...

2015-09-20 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1105#issuecomment-141756735
  
I've created the two JIRA issues: FLINK-2714 and FLINK-2715.

@vasia, if no other objections, can you have another look at this and merge 
it? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...

2015-09-14 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1105#issuecomment-140133538
  
Hey you two :)

I suggest that we add this one. 
Let the JIRA that @vasia opened handle the keys.
Then add the DataSet example (another JIRA) and a JIRA for benchmarks.

I think that would more or less make everyone happy. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2661] Added Node Splitting Methods

2015-09-14 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1124#issuecomment-140131099
  
Hi @vasia ,

There was no clear rejection in the discussion, so I thought I'd try my 
luck :)

This technique could be enabled by using a flag only if we introduce a node 
split version of all the library methods. Otherwise, for newly written code, 
users would still have to pass a combine function according to their algorithm. 
They'd also have to pass the threshold that differentiates high-degree vertices 
from low-degree vertices. What is more, they still need to split and aggregate 
before and after each step.

These are aspects that cannot be guessed and that highly depend on the 
algorithm. 

Also, what I had in mind regarding this PR was that we would discuss and  
try to improve the code a bit, rather than immediately discarding it :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2661] Added Node Splitting Methods

2015-09-14 Thread andralungu
Github user andralungu closed the pull request at:

https://github.com/apache/flink/pull/1124


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2661] Added Node Splitting Methods

2015-09-12 Thread andralungu
GitHub user andralungu opened a pull request:

https://github.com/apache/flink/pull/1124

[FLINK-2661] Added Node Splitting Methods

Social media graphs, citation networks or even protein networks have a 
common property: their degree distribution follows a power-law curve. This 
structure raises challenges to both vertex-centric and GSA/GAS models because 
they uniformly process vertices, regardless of their degree distribution. This 
leads to large execution time stalls: vertices wait for skewed nodes to finish 
computation [synchronous]. 

This PR aims to diminish the impact of high-degree nodes by proposing four 
main functions: `determinieSkewedVertices`, `treeDeAggregate` (splits a node 
into subnodes, recursively, in levels), `propagateValuesToSplitVertices` 
(useful when the algorithm performs more than one superstep), `treeAggregate` 
(brings the graph back to its initial state). 

These functions modify a graph at a high-level, making its degree 
distribution more uniform. The method does not need any special partitioning or 
runtime modification and (for skewed networks and computationally intensive 
algorithms) can speed up the run time by a factor of two. 

I added an example: NodeSplittingJaccardSimilarityMeasure, for which I 
needed to split the overall sequence of operations to two functions to be able 
to test the result. Calling the entire main method would have resulted in the 
"Two few memory segments etc" exception - too many operations called within one 
test, in other words. 

For more info, please consult the additional entry in the documentation. 

If we reach a common point and this PR gets merged, I will also follow 
@fhueske 's suggestion from the mailing list - adding a Split version of each 
of the library methods to allow users to verify whether their run time 
improves. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andralungu/flink splitJaccardFlink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1124


commit b02d0917edcd5f3c8846fe01044afd7444a58c08
Author: Andra Lungu <lungu.an...@gmail.com>
Date:   2015-09-12T10:25:20Z

[FLINK-2661] Added Node Splitting Methods

[FLINK-2661] Minor modifications in the docs

[FLINK-2661] pdf to png




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...

2015-09-12 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1105#issuecomment-139820404
  
If it's okay with you, I'd like to see what @vasia has to say about adding 
the Triangle Count example from the DataSet API +  a reduce as a library 
method. IMO, it's a better addition, but for some reason, we preferred the 
Pregel/GSA implementations at a certain point (it was because in my thesis, I 
take Pregel as a baseline). 

Also the generic K instead of Long makes perfect sense. However, if we 
decide to change it, I'll have to open a JIRA to revise the entire set of 
library methods because apart from PageRank, I think they all restrict 
themselves to one common key type. I would be kind of scared of type erasure in 
the K implements Key case :-S 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-09-12 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-139822550
  
I think it's safe to open a fresh PR... fixing this one would be overkill. 
You can also update and rebase #923 so that we can review them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...

2015-09-11 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/1105#discussion_r39276971
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleCount.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.Iterator;
+import java.util.TreeMap;
+
+/**
+ * Triangle Count Algorithm.
+ *
+ * This algorithm operates in three phases. First, vertices select 
neighbors with id greater than theirs
+ * and send messages to them. Each received message is then propagated to 
neighbors with higher id.
+ * Finally, if a node encounters the target id in the list of received 
messages, it increments the number
+ * of triangles found.
+ *
+ * For skewed graphs, we recommend calling the GSATriangleCount library 
method as it uses the more restrictive
+ * `reduceOnNeighbors` function which internally makes use of combiners to 
speed up computation.
+ *
+ * This implementation is non - iterative.
+ *
+ * The algorithm takes an undirected, unweighted graph as input and 
outputs a DataSet of
+ * Tuple1 which contains a single integer representing the number of 
triangles.
+ */
+public class TriangleCount implements
+   GraphAlgorithm<Long, NullValue, NullValue, 
DataSet<Tuple1>> {
--- End diff --

For the Tuple1 comment: I wanted to be consistent with the other examples. 
If you run it on a cluster, you'd like the result printed in a file. You could 
print it with regular Java functions, but `writeAsCsv` and `print` were more 
appealing (to me at least) :)  

For the key type: At some point, you check that your id is lower than the 
neighbors' ids... How would that comparison be made if you don't know the type. 
If let's say you received String, you'd need to do `Long.parseLong` If you 
know a way, I'd gladly change it, but I don't think this is the only library 
method that has the key type problem... Even ConnectedComponents uses Long. 
Maybe we can open a JIRA for that...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...

2015-09-11 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1105#issuecomment-139564327
  
The only argument I have for this particular code (as opposed to the one in 
the DataSet API) is that his one simulates what Pregel is doing... But since 
it's a library method, it should be used as is, without caring about the model. 
IMO, the discussion in  #1054 asked for this particular version, but if there 
is a consensus I can change it to use the more efficient one... or even close 
the PR and point people to the DataSet API...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...

2015-09-08 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1054#issuecomment-138497145
  
Since the bugfix version was released days ago, I guess this is safe to 
merge...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...

2015-09-08 Thread andralungu
GitHub user andralungu opened a pull request:

https://github.com/apache/flink/pull/1105

[FLINK-2634] [gelly] Added a vertex-centric Triangle Count Lib Method

This PR adds a vertex centric version of the TriangleCount algorithm as 
discussed in #1054. 

The main difference is that the reduceOn* calls have been replaced with 
groupReduceOn* calls.

Depending on the input graph, one may need to use the GAS version (skewed 
networks) or the vertex-centric version.  

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andralungu/flink tcVertexCentric

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1105


commit 6f24c6918d7de4cd5c8fdced7248836a4704ceb3
Author: Andra Lungu <lungu.an...@gmail.com>
Date:   2015-09-08T12:15:23Z

[FLINK-2634] [gelly] Added a vertex-centric Triangle Count Library Method




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...

2015-09-08 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1058#issuecomment-138692441
  
It seems the issue was already fixed via #1075 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...

2015-09-08 Thread andralungu
Github user andralungu closed the pull request at:

https://github.com/apache/flink/pull/1058


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...

2015-08-27 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1058#issuecomment-135407258
  
so you mean something like

`ListTuple2Integer, Long offsets = 
getRuntimeContext().getBroadcastVariableWithInitializer(counts, new 
ArrayList());`?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...

2015-08-27 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1058#issuecomment-135396805
  
Yup that's what I thought as well. 
However, I only tested `zipWithUniqueId` on a cluster. 
A student came to me and said that `zipWithIndex` did not work in a cluster 
environment (I believed him) and claimed some days after via the Jira issue 
that this was the fix that worked for him. 

That's all the input I have. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...

2015-08-26 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1054#issuecomment-134847386
  
okay...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2512] [bugfix] Used a concurrent list i...

2015-08-26 Thread andralungu
GitHub user andralungu opened a pull request:

https://github.com/apache/flink/pull/1058

[FLINK-2512] [bugfix] Used a concurrent list in the open method

This PR fixes the concurrency issue raised by the elements in the broadcast 
set. I instantiated the list to be a CopyOnWriteArrayList. Hope that fixes the 
issue! 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andralungu/flink zipWithIndexBugFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1058.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1058


commit 8e81256d6d10140d180762c118ef4bd575300dd3
Author: Andra Lungu lungu.an...@gmail.com
Date:   2015-08-26T11:26:32Z

[FLINK-2512] [bugfix] Used a concurrent list in the open method




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...

2015-08-26 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1058#issuecomment-135042510
  
I guess because more parallel instances are trying to access that broadcast 
variable. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2512] [bugfix] Used a concurrent list i...

2015-08-26 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1058#issuecomment-135027923
  
excerpt from the JIRA 
This can be fixed by wrapping a concurrent list around the counts variable
e.g. CopyOnWriteArrayList from java.util.concurrrent (in the open method)

Stood there open for a while so I thought I fixed it. Don't really know 
what test should be written. The use case is the same, the environment just 
goes from local to cluster. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...

2015-08-26 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1058#issuecomment-135035150
  
Yes, I am sorry about that. It was a propagated typo. Problem solved! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...

2015-08-25 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1054#issuecomment-134590137
  
You're right @tillrohrmann. I updated the JIRA to also contain a small 
description :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2563] [gelly] changed Graph's run() to ...

2015-08-25 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1042#issuecomment-134516708
  
Merging... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...

2015-08-25 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1054#issuecomment-134702560
  
Hi @vasia, 

I clarified the type of input expected. The graph should be undirected. 
Without the distinct, you get duplicate edges there(and an erroneous number of 
triangles). The second bullet point is again not an issue because the graph is 
undirected. 
The result should be fine. For the SNAP data sets, I got a number equal to 
theirs on a cluster.

Concerning the runtime, you are right, It's just true for some cases 
(generally faster by a factor of two) but it highly depends on the data set. 
So, once this gets merged, I'll go ahead and propose the vertex centric version 
as well. That way, the user can choose. 

Hope I clarified everything!
Let me know if you still have questions :)
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...

2015-08-25 Thread andralungu
GitHub user andralungu opened a pull request:

https://github.com/apache/flink/pull/1054

[FLINK-2570] [gelly] Added a Triangle Count Library Method

This PR adds a Triangle Count Library Method to Gelly. 

I decided to add the GAS version because it is faster than vertex-centric 
for most graph data sets.

This implementation has been extensively tested on a 30-node, 16GB 
RAM/machine, cluster.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andralungu/flink triangleCount

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1054.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1054


commit 99ef9720f5c70ab2029b21017d6b9d3100c827e8
Author: Andra Lungu lungu.an...@gmail.com
Date:   2015-08-25T12:07:18Z

[FLINK-2570] [gelly] Added a Triangle Count Library Method




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2563] [gelly] changed Graph's run() to ...

2015-08-23 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1042#issuecomment-133881425
  
Exactly what I had in mind. +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2451] [gelly] examples and library clea...

2015-08-11 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/1000#issuecomment-130002982
  
Apart from the minor comment, this looks good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2451] [gelly] examples and library clea...

2015-08-11 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/1000#discussion_r36779494
  
--- Diff: 
flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
 ---
@@ -86,26 +72,35 @@ public void testConnectedComponents() throws Exception {
 
@Test
public void testSingleSourceShortestPaths() throws Exception {
-   GSASingleSourceShortestPaths.main(new String[]{1, edgesPath, 
resultPath, 16});
-   expectedResult = 1 0.0\n +
-   2 12.0\n +
-   3 13.0\n +
-   4 47.0\n +
-   5 48.0\n +
-   6 Infinity\n +
-   7 Infinity\n;
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   GraphLong, Double, Double inputGraph = Graph.fromDataSet(
+   
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+   new InitMapperSSSP(), env);
+
+ListVertexLong, Double result = inputGraph.run(new 
GSASingleSourceShortestPathsLong(1l, 16))
+   .getVertices().collect();
--- End diff --

Is there a specific reason for which we decided to use `collect()` in the 
tests? It does not seem to be a consistency issue. The other tests (in flink) 
are still using  `compareResultsByLinesInMemory()`. Do we gain anything?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...

2015-08-11 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/883#issuecomment-130003492
  
I guess the only request that was not fulfilled for this PR was to squash 
the commits. Then it should be fine. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-08-11 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-130014300
  
Saw this I will also update the documentation  afterwards... Sorry!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-08-11 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-130013958
  
Hi @vasia, 

Not sure whether this comment was issued for me... Nevertheless I left some 
suggestions inline. All in all, it covers the problems discussed in the 73! 
comments here. You forgot to properly document the edgeTypes(K, EV), etc 
methods. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Hits

2015-08-11 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/765#issuecomment-130016029
  
@vasia , I guess the guys already finished the IMPRO3 course. 

What is the status of this PR @mfahimazizi? Do you have time to address the 
comments? Are you still interested in contributing? 

We would be happy to answer your questions :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2452] [Gelly] adds a playcount threshol...

2015-08-06 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/968#issuecomment-128456801
  
Nope, no comment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2375] Add Approximate Adamic Adar Simil...

2015-07-20 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-122868758
  
Could you also do a git ammend to reference the correct jira issue in the 
commit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-20 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-123153943
  
Hi @shghatge ,

Did you also run the two examples on the cluster to make sure that the 
approximate version is faster?
Then you could also add some numbers to the two PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-07-14 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-121215203
  
Hmmm :-? but can you pass NullValue to tyes... it expects Something.class. 
Can it be overwritten without type erasure getting in the way? 

Anyway... I will let @shghatge take over from here :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-07-13 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-120856171
  
Hi,

I just had a closer look at this PR and it made me seriously question the 
utility of a `Graph.fromCSV` method. Why? First of all because it's more 
limited than the regular `env.fromCsv()` in the sense that it does not allow 
POJOs and it would be a bit tedious to support that. There would be a need for 
methods with 2 to n fields, according to the amount of attributes present in 
the POJO. 

Second, because, and I am speaking strictly as a user here, I would rather 
write:
private static DataSetEdgeLong, Double 
getEdgesDataSet(ExecutionEnvironment env) {

if(fileOutput) {
return env.readCsvFile(edgeInputPath)
.ignoreComments(#)
.fieldDelimiter(\t)
.lineDelimiter(\n)
.types(Long.class, Long.class, 
Double.class)
.map(new Tuple3ToEdgeMapLong, 
Double());
} else {
return 
CommunityDetectionData.getDefaultEdgeDataSet(env);
}
}

than...

private static GraphLong, Long, Double getGraph(ExecutionEnvironment env) 
{
GraphLong, Long, Double graph;
if(!fileOutput) {
DataSetEdgeLong, Double edges = 
CommunityDetectionData.getDefaultEdgeDataSet(env);
graph = Graph.fromDataSet(edges,
new MapFunctionLong, Long() {

public Long map(Long label) {
return label;
}
}, env);
} else {
graph = Graph.fromCsvReader(edgeInputPath,new 
MapFunctionLong, Long() {
public Long map(Long label) {
return label;
}
}, env).ignoreCommentsEdges(#)
.fieldDelimiterEdges(\t)
.lineDelimiterEdges(\n)
.typesEdges(Long.class, Double.class)
.typesVertices(Long.class, Long.class);
}
return graph;
}

Maybe it's just a preference thing... but I believe it's at least worth a 
discussion. On the other hand, the utility of such a method should have been 
questioned from its early Jira days, so I guess that's my mistake.

I would like to hear your thoughts on this. 
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-07-13 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-121032514
  
Hi @vasia, 

I also saw the types issue, but I had a feeling that this is the way it was 
decided in the previous comment. I would rather have different names for 2 and 
3 than to force a call to `typeVertices` if it's not needed.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-07-13 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-121039562
  
Yes, but then you would have the following methods: `types`, 
`typesNoEdgeValue`, `typesNoVertexValue` and again `types`. So, even if it's 
not 100% needed I'd try to keep it consistent. We could also make it more 
graph-oriented (the name `types` was generic). The following is just an example:
1). keyType(K) 
2). keyAndVertexTypes(K, VV)
3). keyAndEdgeTypes(K, EV)
4). keyVertexAndEdgeTypes(K, VV, EV)

With a nice documentation, I think I'd understand what these are for :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-07-13 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r34504887
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with 
edge (vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) 
data such as the field types,
+ * the delimiters (row and field),  the fields that should be included or 
skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+@SuppressWarnings({unused , unchecked})
+public class GraphCsvReaderK,VV,EV {
+
+   private final Path vertexPath,edgePath;
+   private final ExecutionEnvironment executionContext;
+   protected CsvReader EdgeReader;
+   protected CsvReader VertexReader;
+   protected MapFunctionK, VV mapper;
+   protected ClassK vertexKey;
+   protected ClassVV vertexValue;
+   protected ClassEV edgeValue;
+

+//
+   public GraphCsvReader(Path vertexPath,Path edgePath, 
ExecutionEnvironment context) {
+   this.vertexPath = vertexPath;
+   this.edgePath = edgePath;
+   this.VertexReader = new CsvReader(vertexPath,context);
+   this.EdgeReader = new CsvReader(edgePath,context);
+   this.mapper=null;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
+   this.vertexPath = null;
+   this.edgePath = edgePath;
+   this.EdgeReader = new CsvReader(edgePath,context);
+   this.VertexReader = null;
+   this.mapper = null;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader(Path edgePath,final MapFunctionK, VV mapper, 
ExecutionEnvironment context) {
+   this.vertexPath = null;
+   this.edgePath = edgePath;
+   this.EdgeReader = new CsvReader(edgePath,context);
+   this.VertexReader = null;
+   this.mapper = mapper;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
+   this(new Path(Preconditions.checkNotNull(edgePath, The file 
path may not be null.)), context);
+
+   }
+
+   public GraphCsvReader(String vertexPath, String edgePath, 
ExecutionEnvironment context) {
+   this(new Path(Preconditions.checkNotNull(vertexPath, The file 
path may not be null.)),
+   new Path(Preconditions.checkNotNull(edgePath, 
The file path may not be null.)), context);
+   }
+
+
+   public GraphCsvReader (String edgePath, final MapFunctionK, VV 
mapper, ExecutionEnvironment context) {
+   this(new Path(Preconditions.checkNotNull(edgePath, The 
file path may not be null.)),mapper, context);
+   }
+
+   
//
+   /**
+* Specifies the types for the edges fields and returns this instance 
of GraphCsvReader

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-07-13 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r34504919
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with 
edge (vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) 
data such as the field types,
+ * the delimiters (row and field),  the fields that should be included or 
skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+@SuppressWarnings({unused , unchecked})
+public class GraphCsvReaderK,VV,EV {
+
+   private final Path vertexPath,edgePath;
+   private final ExecutionEnvironment executionContext;
+   protected CsvReader EdgeReader;
+   protected CsvReader VertexReader;
+   protected MapFunctionK, VV mapper;
+   protected ClassK vertexKey;
+   protected ClassVV vertexValue;
+   protected ClassEV edgeValue;
+

+//
+   public GraphCsvReader(Path vertexPath,Path edgePath, 
ExecutionEnvironment context) {
+   this.vertexPath = vertexPath;
+   this.edgePath = edgePath;
+   this.VertexReader = new CsvReader(vertexPath,context);
+   this.EdgeReader = new CsvReader(edgePath,context);
+   this.mapper=null;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
+   this.vertexPath = null;
+   this.edgePath = edgePath;
+   this.EdgeReader = new CsvReader(edgePath,context);
+   this.VertexReader = null;
+   this.mapper = null;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader(Path edgePath,final MapFunctionK, VV mapper, 
ExecutionEnvironment context) {
+   this.vertexPath = null;
+   this.edgePath = edgePath;
+   this.EdgeReader = new CsvReader(edgePath,context);
+   this.VertexReader = null;
+   this.mapper = mapper;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
+   this(new Path(Preconditions.checkNotNull(edgePath, The file 
path may not be null.)), context);
+
+   }
+
+   public GraphCsvReader(String vertexPath, String edgePath, 
ExecutionEnvironment context) {
+   this(new Path(Preconditions.checkNotNull(vertexPath, The file 
path may not be null.)),
+   new Path(Preconditions.checkNotNull(edgePath, 
The file path may not be null.)), context);
+   }
+
+
+   public GraphCsvReader (String edgePath, final MapFunctionK, VV 
mapper, ExecutionEnvironment context) {
+   this(new Path(Preconditions.checkNotNull(edgePath, The 
file path may not be null.)),mapper, context);
+   }
+
+   
//
+   /**
+* Specifies the types for the edges fields and returns this instance 
of GraphCsvReader

[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...

2015-07-11 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/877#issuecomment-120654787
  
Merging... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-08 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/892#discussion_r34189267
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/AdamicAdarSimilarityMeasure.java
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Triplet;
+import 
org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData;
+import java.util.HashSet;
+
+/**
+ * Given a directed, unweighted graph, return a weighted graph where the 
edge values are equal
+ * to the Adamic Acard similarity coefficient which is given as
+ * Summation of weights of common neighbors of the source and destination 
vertex
+ *The weights are given as 1/log(nK) nK is the degree  or the vertex
+ * See a 
href=http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf;Friends and 
neighbors on the Web/a
+ *
+ * p
+ * Input files are plain text files and must be formatted as follows:
+ * br
+ * Edges are represented by pairs of srcVertexId, trgVertexId 
separated by tabs.
+ * Edges themselves are separated by newlines.
+ * For example: code12\n13\n/code defines two edges 
1-2 and 1-3.
+ * /p
+ *
+ * Usage code AdamicAdarSimilarityMeasure lt;edge pathgt; lt;result 
pathgt;/codebr
+ * If no parameters are provided, the program is run with default data from
+ * {@link 
org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData}
+ */
+@SuppressWarnings(serial)
+public class AdamicAdarSimilarityMeasure implements ProgramDescription {
+
+
+   public static void main(String[] args) throws  Exception {
+
+   if(!parseParameters(args)) {
+   return;
+   }
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSetEdgeLong, Double edges = 
AdamicAdarSimilarityMeasure.getEdgesDataSet(env);
+
+   /*Graph is generated without vertex weights. Vertices will have 
a Tuple2 as value
+   where first field will be the weight of the vertex as 1/log(kn) 
kn is the degree of the vertex
+   Second field is a HashSet whose elements are again Tuple2, of 
which first field is the VertexID of the neighbor
+   and second field is the weight of that neighbor
+   */
+   GraphLong, Tuple2Double, HashSetTuple2Long, Double, 
Double graph =
+   Graph.fromDataSet(edges,new mapVertices(), env);
+
+   DataSetTuple2Long, Long degrees = graph.getDegrees();
+
+   //vertices are given weights in graph
+   graph = graph.joinWithVertices(degrees,new 
AssignWeightToVertices());
+
+   //neighbors are computed for all the vertices
+   DataSetTuple2Long, Tuple2Double, HashSetTuple2Long, 
Double computedNeighbors =
+   graph.reduceOnNeighbors(new GatherNeighbors(), 
EdgeDirection.ALL);
+
+   //graph is updated with the new vertex values
+   GraphLong, Tuple2Double, HashSetTuple2Long, Double, 
Double graphWithVertexValues =
+   graph.joinWithVertices(computedNeighbors, new 
UpdateGraphVertices());
+
+   //edges are given Adamic Adar coefficient as value
+   DataSetEdgeLong, Double edgesWithAdamicValues

[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-08 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/892#discussion_r34189351
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/AdamicAdarSimilarityMeasure.java
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Triplet;
+import 
org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData;
+import java.util.HashSet;
+
+/**
+ * Given a directed, unweighted graph, return a weighted graph where the 
edge values are equal
+ * to the Adamic Acard similarity coefficient which is given as
+ * Summation of weights of common neighbors of the source and destination 
vertex
+ *The weights are given as 1/log(nK) nK is the degree  or the vertex
+ * See a 
href=http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf;Friends and 
neighbors on the Web/a
+ *
+ * p
+ * Input files are plain text files and must be formatted as follows:
+ * br
+ * Edges are represented by pairs of srcVertexId, trgVertexId 
separated by tabs.
+ * Edges themselves are separated by newlines.
+ * For example: code12\n13\n/code defines two edges 
1-2 and 1-3.
+ * /p
+ *
+ * Usage code AdamicAdarSimilarityMeasure lt;edge pathgt; lt;result 
pathgt;/codebr
+ * If no parameters are provided, the program is run with default data from
+ * {@link 
org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData}
+ */
+@SuppressWarnings(serial)
+public class AdamicAdarSimilarityMeasure implements ProgramDescription {
+
+
+   public static void main(String[] args) throws  Exception {
+
+   if(!parseParameters(args)) {
+   return;
+   }
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSetEdgeLong, Double edges = 
AdamicAdarSimilarityMeasure.getEdgesDataSet(env);
+
+   /*Graph is generated without vertex weights. Vertices will have 
a Tuple2 as value
+   where first field will be the weight of the vertex as 1/log(kn) 
kn is the degree of the vertex
+   Second field is a HashSet whose elements are again Tuple2, of 
which first field is the VertexID of the neighbor
+   and second field is the weight of that neighbor
+   */
+   GraphLong, Tuple2Double, HashSetTuple2Long, Double, 
Double graph =
+   Graph.fromDataSet(edges,new mapVertices(), env);
+
+   DataSetTuple2Long, Long degrees = graph.getDegrees();
+
+   //vertices are given weights in graph
+   graph = graph.joinWithVertices(degrees,new 
AssignWeightToVertices());
+
+   //neighbors are computed for all the vertices
+   DataSetTuple2Long, Tuple2Double, HashSetTuple2Long, 
Double computedNeighbors =
+   graph.reduceOnNeighbors(new GatherNeighbors(), 
EdgeDirection.ALL);
+
+   //graph is updated with the new vertex values
+   GraphLong, Tuple2Double, HashSetTuple2Long, Double, 
Double graphWithVertexValues =
+   graph.joinWithVertices(computedNeighbors, new 
UpdateGraphVertices());
+
+   //edges are given Adamic Adar coefficient as value
+   DataSetEdgeLong, Double edgesWithAdamicValues

[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-08 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-119708667
  
Code-style freak says everything else is fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-08 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/892#discussion_r34189005
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/AdamicAdarSimilarityMeasure.java
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Triplet;
+import 
org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData;
+import java.util.HashSet;
+
+/**
+ * Given a directed, unweighted graph, return a weighted graph where the 
edge values are equal
+ * to the Adamic Acard similarity coefficient which is given as
+ * Summation of weights of common neighbors of the source and destination 
vertex
+ *The weights are given as 1/log(nK) nK is the degree  or the vertex
+ * See a 
href=http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf;Friends and 
neighbors on the Web/a
--- End diff --

@see, not just see :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...

2015-07-06 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/696#discussion_r33924163
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/KNN.scala
 ---
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.DataSetUtils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.{DistanceMetric, 
EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * This algorithm calculates `k` nearest neighbor points in training set 
for each points of
+  * testing set.
+  *
+  * @example
+  * {{{
+  * val trainingDS: DataSet[Vector] = ...
+  * val testingDS: DataSet[Vector] = ...
+  *
+  * val knn = KNN()
+  *   .setK(10)
+  *   .setBlocks(5)
+  *   .setDistanceMetric(EuclideanDistanceMetric())
+  *
+  * knn.fit(trainingDS)
+  *
+  * val predictionDS: DataSet[(Vector, Array[Vector])] = 
knn.predict(testingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.classification.KNN.K]]
+  * Sets the K which is the number of selected points as neighbors. 
(Default value: '''None''')
+  *
+  * - [[org.apache.flink.ml.classification.KNN.Blocks]]
+  * Sets the number of blocks into which the input data will be split. 
This number should be set
+  * at least to the degree of parallelism. If no value is specified, then 
the parallelism of the
+  * input [[DataSet]] is used as the number of blocks. (Default value: 
'''None''')
+  *
+  * - [[org.apache.flink.ml.classification.KNN.DistanceMetric]]
+  * Sets the distance metric to calculate distance between two points. If 
no metric is specified,
+  * then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] 
is used. (Default value:
+  * '''EuclideanDistanceMetric()''')
+  *
+  */
+class KNN extends Predictor[KNN] {
+
+  import KNN._
+
+  var trainingSet: Option[DataSet[Block[Vector]]] = None
+
+  /** Sets K
+* @param k the number of selected points as neighbors
+*/
+  def setK(k: Int): KNN = {
+require(k  1, K must be positive.)
+parameters.add(K, k)
+this
+  }
+
+  /** Sets the distance metric
+* @param metric the distance metric to calculate distance between two 
points
+*/
+  def setDistanceMetric(metric: DistanceMetric): KNN = {
+parameters.add(DistanceMetric, metric)
+this
+  }
+
+  /** Sets the number of data blocks/partitions
+* @param n the number of data blocks
+*/
+  def setBlocks(n: Int): KNN = {
+require(n  1, Number of blocks must be positive.)
+parameters.add(Blocks, n)
+this
+  }
+}
+
+object KNN {
+
+  case object K extends Parameter[Int] {
+val defaultValue: Option[Int] = None
+  }
+
+  case object DistanceMetric extends Parameter[DistanceMetric] {
+val defaultValue: Option[DistanceMetric] = 
Some(EuclideanDistanceMetric())
+  }
+
+  case object Blocks extends Parameter[Int] {
+val defaultValue: Option[Int] = None
+  }
+
+  def apply(): KNN = {
+new KNN()
+  }
+
+  /** [[FitOperation]] which trains a KNN based on the given training data 
set.
+* @tparam T Subtype of [[Vector]]
+*/
+  implicit def fitKNN[T : Vector : TypeInformation] = new 
FitOperation[KNN, T

[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...

2015-07-06 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/877#issuecomment-118891421
  
I see the requirements have been fulfilled here. If no objections, I'd like 
to merge this by the end of the week :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2150] Added zipWithUniqueIds

2015-07-02 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/801#issuecomment-118171270
  
Merging...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: My branch

2015-07-02 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/881#issuecomment-118177927
  
I am a bit out of context here, but I believe this PR should be deleted :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...

2015-07-02 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/877#issuecomment-118179979
  
Hihi! Shivani is updating the PR on the fly :) Reminds me of myself back in 
the day ;)

If I may tease: If these GSA parameters did not have an example, I would 
have asked for one. So my two cents: shouldn't we, instead of deleting 
completely, replace the example with a better one? IncrementalGSASSSP :)) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...

2015-07-01 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/877#issuecomment-117540482
  
Hi @shghatge ,

This PR looks very nice; I added some minor inline comments to make it look 
even nicer :)

One major problem: you have an example and a test for it; however, you 
don't have tests for the GSAConfiguration itself; check 
https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
You need to make sure that given no direction; the iteration works as 
before and if you give it direction IN or ALL, it behaves as it is supposed to. 

Apart from that, this looks almost spotless.

One other minor thing; when you make the PR, in order to have it uber - 
clean you should squash your commits. In this case, you have 3 commits, so:
git rebase -i HEAD~3 should do the trick
Then leave pick for the first commit; squash for the others and let the 
instructions guide you (Ctrl-x; Y, etc...). Then force push. 

The common practice afterwards (i.e. after you address my comments here) is 
to leave it as a new commit (don't squash!). So, first squash these 3, then fix 
the minor issues and commit again.

Awesome work! 
 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...

2015-07-01 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/877#discussion_r33657016
  
--- Diff: docs/libs/gelly_guide.md ---
@@ -693,6 +693,9 @@ Currently, the following parameters can be specified:
 * strongNumber of Vertices/strong: Accessing the total number of 
vertices within the iteration. This property can be set using the 
`setOptNumVertices()` method.
 The number of vertices can then be accessed in the gather, sum and/or 
apply functions by using the `getNumberOfVertices()` method. If the option is 
not set in the configuration, this method will return -1.
 
+* strongNeighbor Direction/strong: By Default values are gathered from 
the out neighbors of the Vertex. This can be modified
--- End diff --

default without capital D, set direction should be written without capital 
s and in between inverted commas (i.e. `setDirection()`)

It would be nice to have an example in the docs, showing users how to play 
with this option; check the number of vertices example for inspiration :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...

2015-07-01 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/877#discussion_r33657118
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAExistenceOfPaths.java
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.PathExistenceData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.GSAConfiguration;
+
+
+import java.util.HashSet;
+
+/**
+ * This example implements a program in which we find out the vertices for 
which there exists a path from given vertex
+ *
+ * The edges input file is expected to contain one edge per line, with 
long IDs and long values
+ * The vertices input file is expected to contain one vertex per line with 
long IDs and no value
+ * If no arguments are provided, the example runs with default data for 
this example
+ */
+public class GSAExistenceOfPaths implements ProgramDescription {
+
+   @SuppressWarnings(serial)
+   public static void main(String[] args) throws Exception {
+
+   if(!parseParameters(args)) {
+   return;
+   }
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSetTuple3Long, Long, Long edges = getEdgesDataSet(env);
+   DataSetTuple2Long, HashSetLong vertices = 
getVerticesDataSet(env);
+
+   GraphLong, HashSetLong, Long graph = 
Graph.fromTupleDataSet(vertices, edges, env);
+
+
+   GSAConfiguration parameters = new GSAConfiguration();
+   parameters.setDirection(EdgeDirection.IN);
+   // Execute the GSA iteration
+   GraphLong, HashSetLong, Long result = 
graph.runGatherSumApplyIteration(new GetReachableVertices(),
+   
new 
FindAllReachableVertices(),
--- End diff --

leave just one blank line here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...

2015-07-01 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/877#discussion_r33657394
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAExistenceOfPaths.java
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.PathExistenceData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.GSAConfiguration;
+
+
+import java.util.HashSet;
+
+/**
+ * This example implements a program in which we find out the vertices for 
which there exists a path from given vertex
+ *
+ * The edges input file is expected to contain one edge per line, with 
long IDs and long values
+ * The vertices input file is expected to contain one vertex per line with 
long IDs and no value
+ * If no arguments are provided, the example runs with default data for 
this example
+ */
+public class GSAExistenceOfPaths implements ProgramDescription {
+
+   @SuppressWarnings(serial)
--- End diff --

not sure you need this annotation... could you run the example without it 
and see whether you get a warning?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...

2015-07-01 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/877#issuecomment-117785914
  
LGTM :) @vasia ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-07-01 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-117787551
  
Nice and rebased. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2150] Added zipWithUniqueIds

2015-06-30 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/801#issuecomment-117112526
  
Okay, can I get some +1s to merge this :)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

2015-06-28 Thread andralungu
Github user andralungu closed the pull request at:

https://github.com/apache/flink/pull/832


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2150] Added zipWithUniqueIds

2015-06-28 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/801#issuecomment-116287334
  
Hi @thvasilo ,

Sorry for the late reply... I stated in the `zipWithIndex` PR that I am 
waiting for that to be merged before I update this one :). So, here it is! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33354071
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
@@ -282,6 +282,54 @@ public void flatMap(EdgeK, EV edge, 
CollectorTuple1K out) {
}
 
/**
+   * Creates a graph from CSV files.
+   *
+   * Vertices with value are created from a CSV file with 2 fields
+   * Edges with value are created from a CSV file with 3 fields
+   * from Tuple3.
+   *
+   * @param verticesPath path to a CSV file with the Vertices data.
+   * @param edgesPath path to a CSV file with the Edges data
+   * @param context the flink execution environment.
+   * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} 
, which on calling types() method to specify types of the
--- End diff --

on which calling not which on ... or which on calling the types method 
specifies (not to specify)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33354597
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with 
edge (vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) 
data such as the field types,
+ * the delimiters (row and field),  the fields that should be included or 
skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+public class GraphCsvReaderK,VV,EV{
+
+   private final Path path1,path2;
+   private final ExecutionEnvironment executionContext;
+   protected CsvReader EdgeReader;
+   protected CsvReader VertexReader;
+   protected MapFunctionK, VV mapper;
+

+//
+
+   public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment 
context)
+   {
+   this.path1 = path1;
+   this.path2 = path2;
+   this.VertexReader = new CsvReader(path1,context);
+   this.EdgeReader = new CsvReader(path2,context);
+   this.mapper=null;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader(Path path2, ExecutionEnvironment context)
+   {
--- End diff --

again the bracket issue :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33354535
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with 
edge (vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) 
data such as the field types,
+ * the delimiters (row and field),  the fields that should be included or 
skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+public class GraphCsvReaderK,VV,EV{
+
+   private final Path path1,path2;
+   private final ExecutionEnvironment executionContext;
+   protected CsvReader EdgeReader;
+   protected CsvReader VertexReader;
+   protected MapFunctionK, VV mapper;
+

+//
+
+   public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment 
context)
+   {
--- End diff --

Also, @vasia was talking about some general coding style rules in one of 
the previous comments... The way we add the opening block brackets must be 
consistent. So here, after public GraphCsvReader(...) { //open the bracket on 
the same line.

Please look in the rest of the document for similar issues...  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33353966
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
@@ -282,6 +282,54 @@ public void flatMap(EdgeK, EV edge, 
CollectorTuple1K out) {
}
 
/**
+   * Creates a graph from CSV files.
+   *
+   * Vertices with value are created from a CSV file with 2 fields
+   * Edges with value are created from a CSV file with 3 fields
+   * from Tuple3.
--- End diff --

there is a trailing from Tuple3 here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33354571
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with 
edge (vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) 
data such as the field types,
+ * the delimiters (row and field),  the fields that should be included or 
skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+public class GraphCsvReaderK,VV,EV{
+
+   private final Path path1,path2;
+   private final ExecutionEnvironment executionContext;
+   protected CsvReader EdgeReader;
+   protected CsvReader VertexReader;
+   protected MapFunctionK, VV mapper;
+

+//
+
+   public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment 
context)
+   {
+   this.path1 = path1;
--- End diff --

again, the path1, path2 issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33354654
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with 
edge (vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) 
data such as the field types,
+ * the delimiters (row and field),  the fields that should be included or 
skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+public class GraphCsvReaderK,VV,EV{
+
+   private final Path path1,path2;
+   private final ExecutionEnvironment executionContext;
+   protected CsvReader EdgeReader;
+   protected CsvReader VertexReader;
+   protected MapFunctionK, VV mapper;
+

+//
+
+   public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment 
context)
+   {
+   this.path1 = path1;
+   this.path2 = path2;
+   this.VertexReader = new CsvReader(path1,context);
+   this.EdgeReader = new CsvReader(path2,context);
+   this.mapper=null;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader(Path path2, ExecutionEnvironment context)
+   {
+   this.path1=null;
+   this.path2 = path2;
+   this.EdgeReader = new CsvReader(path2,context);
+   this.VertexReader = null;
+   this.mapper = null;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader(Path path2,final MapFunctionK, VV mapper, 
ExecutionEnvironment context)
+   {
+   this.path1=null;
--- End diff --

here it's this,path1 = null; for consistency with the rest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33355251
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
 ---
@@ -150,20 +149,15 @@ private static boolean parseParameters(String[] args) 
{
}
 
@SuppressWarnings(serial)
-   private static DataSetEdgeLong, NullValue 
getEdgesDataSet(ExecutionEnvironment env) {
-   if (fileOutput) {
-   return env.readCsvFile(edgesInputPath)
-   
.lineDelimiter(\n).fieldDelimiter(\t)
-   .types(Long.class, Long.class).map(
-   new 
MapFunctionTuple2Long, Long, EdgeLong, NullValue() {
-
-   public 
EdgeLong, NullValue map(Tuple2Long, Long value) {
-   return 
new EdgeLong, NullValue(value.f0, value.f1, 
-   
NullValue.getInstance());
-   }
-   });
-   } else {
-   return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
+   private static GraphLong, NullValue, NullValue 
getGraph(ExecutionEnvironment env) {
+   if(fileOutput) {
+   return Graph.fromCsvReader(edgesInputPath, 
env).lineDelimiterEdges(\n).fieldDelimiterEdges(\t)
+   
.types(Long.class);
+
+   }
+   else
+   {
+   return 
Graph.fromDataSet(ExampleUtils.getRandomEdges(env, NUM_VERTICES), env);
--- End diff --

Yup... so I like how this looks better than how the previous rewritings 
were made...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-115689621
  
Hi @shghatge ,

I left my set of comments inline. They are mostly related to coding style 
issues. I guess you should revisit the previous comments here.

Also, don't forget to rebase. It seems like there are some merge conflicts 
that need to be fixed :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33354695
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with 
edge (vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) 
data such as the field types,
+ * the delimiters (row and field),  the fields that should be included or 
skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+public class GraphCsvReaderK,VV,EV{
+
+   private final Path path1,path2;
+   private final ExecutionEnvironment executionContext;
+   protected CsvReader EdgeReader;
+   protected CsvReader VertexReader;
+   protected MapFunctionK, VV mapper;
+

+//
+
+   public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment 
context)
+   {
+   this.path1 = path1;
+   this.path2 = path2;
+   this.VertexReader = new CsvReader(path1,context);
+   this.EdgeReader = new CsvReader(path2,context);
+   this.mapper=null;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader(Path path2, ExecutionEnvironment context)
+   {
+   this.path1=null;
+   this.path2 = path2;
+   this.EdgeReader = new CsvReader(path2,context);
+   this.VertexReader = null;
+   this.mapper = null;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader(Path path2,final MapFunctionK, VV mapper, 
ExecutionEnvironment context)
+   {
+   this.path1=null;
+   this.path2 = path2;
+   this.EdgeReader = new CsvReader(path2,context);
+   this.VertexReader = null;
+   this.mapper = mapper;
+   this.executionContext=context;
+   }
+
+   public GraphCsvReader (String path2,ExecutionEnvironment context)
+   {
+   this(new Path(Preconditions.checkNotNull(path2, The file path 
may not be null.)), context);
+
+   }
+
+   public GraphCsvReader(String path1, String path2, ExecutionEnvironment 
context)
+   {
+   this(new Path(Preconditions.checkNotNull(path1, The file path 
may not be null.)),new Path(Preconditions.checkNotNull(path2, The file path 
may not be null.)), context);
+   }
+
+
+   public GraphCsvReader (String path2, final MapFunctionK, VV mapper, 
ExecutionEnvironment context)
+   {
+   this(new Path(Preconditions.checkNotNull(path2, The 
file path may not be null.)),mapper, context);
+   }
+
+   public CsvReader getEdgeReader()
+   {
+   return this.EdgeReader;
+   }
+
+   public CsvReader getVertexReader()
+   {
+   return this.VertexReader;
+   }
+   
//
+
+   /**
+* Specifies the types for the Graph fields and returns a Graph with 
those field types
+*
+* This method

[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33354136
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
@@ -282,6 +282,54 @@ public void flatMap(EdgeK, EV edge, 
CollectorTuple1K out) {
}
 
/**
+   * Creates a graph from CSV files.
+   *
+   * Vertices with value are created from a CSV file with 2 fields
+   * Edges with value are created from a CSV file with 3 fields
+   * from Tuple3.
+   *
+   * @param verticesPath path to a CSV file with the Vertices data.
+   * @param edgesPath path to a CSV file with the Edges data
+   * @param context the flink execution environment.
+   * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} 
, which on calling types() method to specify types of the
+   *Vertex ID, Vertex Value and Edge value returns a Graph
+   */
+   public static  GraphCsvReader fromCsvReader(String verticesPath, String 
edgesPath, ExecutionEnvironment context) {
+   return new GraphCsvReader(verticesPath, edgesPath, context);
+   }
+   /** Creates a graph from a CSV file for Edges., Vertices are
--- End diff --

... Edges. \n (right now it\s .,)
Vertices



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33355373
  
--- Diff: 
flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
 ---
@@ -156,4 +181,17 @@ public DummyCustomType map(Long vertexId) {
return new DummyCustomType(vertexId.intValue()-1, 
false);
}
}
+
+   private FileInputSplit createTempFile(String content) throws 
IOException {
+   File tempFile = File.createTempFile(test_contents, tmp);
+   tempFile.deleteOnExit();
--- End diff --

`deleteOnExit()`... nice!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33354309
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with 
edge (vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) 
data such as the field types,
+ * the delimiters (row and field),  the fields that should be included or 
skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+public class GraphCsvReaderK,VV,EV{
+
+   private final Path path1,path2;
+   private final ExecutionEnvironment executionContext;
+   protected CsvReader EdgeReader;
+   protected CsvReader VertexReader;
+   protected MapFunctionK, VV mapper;
+

+//
+
+   public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment 
context)
--- End diff --

let's not call these path1 and path2. I suggest we use better names like 
edgePath, vertexPath... This is valid for the methods underneath too...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-26 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/847#discussion_r33355180
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
 ---
@@ -119,23 +113,32 @@ private static boolean parseParameters(String [] 
args) {
return true;
}
 
-   @SuppressWarnings(serial)
-   private static DataSetEdgeLong, NullValue 
getEdgesDataSet(ExecutionEnvironment env) {
-
-   if(fileOutput) {
-   return env.readCsvFile(edgeInputPath)
-   .ignoreComments(#)
-   .fieldDelimiter(\t)
-   .lineDelimiter(\n)
-   .types(Long.class, Long.class)
-   .map(new MapFunctionTuple2Long, 
Long, EdgeLong, NullValue() {
-   @Override
-   public EdgeLong, NullValue 
map(Tuple2Long, Long value) throws Exception {
-   return new EdgeLong, 
NullValue(value.f0, value.f1, NullValue.getInstance());
+   private static GraphLong, Long, NullValue 
getGraph(ExecutionEnvironment env)
+   {
+   GraphLong, Long, NullValue graph;
+   if(!fileOutput)
+   {
+   DataSetEdgeLong, NullValue edges = 
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
--- End diff --

Let's also keep this consistent. In Single Source Shortest Paths you read 
fromDataSet(getDefault..., env). maybe we could do that for all the examples 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

2015-06-25 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/832#issuecomment-115134406
  
Hey Theo, 

Thanks a lot for finding my bug there ^^
PR updated to address the Java issues and to contain a  pimped Scala 
version of `zipWithIndex` :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

2015-06-24 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/832#issuecomment-114883882
  
Uhmmm... flink.api.scala is imported. That's not the issue. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

2015-06-24 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/832#issuecomment-114878728
  
Actually, I get a weird compile error: it says missing Type parameter for 
the map in DataSet.scala...
I think this is because the map is overriden... and I haven't found the 
workaround just yet...
(The error is reproducible by calling `testZipWithIndex`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

2015-06-23 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/832#issuecomment-114481973
  
Hey @tillrohrmann , 

Sorry for the incredibly late reply. The last weeks have been very hectic. 
Nevertheless, I'd like to properly finish and polish this issue very soon. 

For that: I have addressed the Java comments, but I still have to provide 
support for Scala. I love this task because it really takes me out of my 
comfort zone: Gelly and Java. It's no secret that Scala is not my strongest 
point. Therefore, I'd like to use this thread to ask some rather trivial 
questions: 

Before defining implicit methods and using pimp-my-lib, I need to wrap the 
Java function. Which should be easy right? Since there is a `wrap` method.
This being said, in org.apache.flink.api.scala, I created a DataSetUtils 
class. and wanted to call wrap(ju.countElements...). Apparently it does not let 
me. Can someone help me out with that?

Thanks!
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API

2015-06-23 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/808#issuecomment-114420568
  
Thanks @aljoscha for reminding me... This slipped my mind for a sec. 

I agree that the methods should have Scaladoc (this should be 
straightforward to add starting from  the existing Javadoc). What would also be 
nice, IMO would be to update  the docs so that they would also contain a Scala 
version of the example code snippets, similar to the programming guide, for 
example 
(http://ci.apache.org/projects/flink/flink-docs-release-0.8/programming_guide.html).

Furthermore, the newly added methods (e.g. `addVertices`) are missing. 
However, that should be a separate Jira.

Apart from the formatting issue and the documentation issue, everything 
looks nice and clean :) 

  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2149][gelly] Simplified Jaccard Example

2015-06-20 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/770#issuecomment-113752297
  
Meging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-19 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-113483077
  
Mea culpa. No the mapper test is fine;

For the examples comment, I meant to go through the classes in the example 
folder and to modify the way the graph is currently read. Right now, we fetch 
the edges via `env.fromCsv` and then use `Graph.fromDataSet` to create the 
graph. We should do it directly via Graph.fromCsv. 

The example in the docs is fine, because it explains how fromDataSet works. 
That is still available. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method

2015-06-18 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/818#issuecomment-113149078
  
Hi @shghatge ,

Don't forget to remove the definition for the public 
removeVertices(DataSet) from the documentation.  

Up for discussion: should we keep the name removeVertices for the private, 
helper method or should we call it something else, like 
removeVerticesAndEdges... Names are not my strongest point, but I guess you got 
the idea :) Personally, I am fine with the current name!  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-18 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-113213190
  
This looks very nice! Someone deserves a virtual :ice_cream: ! 

There are some tests missing: 
- test `fromCSV` with a Mapper
- you just test `types`, `ignoreFirstLines` and `ignoreComments`; let's at 
least add tests for the `lineDelimiter*` and the `fieldDelimiter*` methods. I'm 
sure they work, but tests are written to guarantee that the functionality will 
also be there (at the same quality) in the future (i.e. some exotic code 
addition will not break it) :)

I saw an outdated Vasia comment on an unused import; always hit mvn verify 
before pushing - it would have caught that :D 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-06-18 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-113217602
  
Ah! And I just remembered! Maybe it makes sense to update the examples to 
use `fromCSV` when creating the Graph instead of `getEdgesDataSet`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1520] Read edges and vertices from CSV ...

2015-06-17 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/846#issuecomment-112783592
  
Hi @shghatge ,

It seems we have a bit of a mess in this PR. Nothing that cannot be fixed. 
Let's take it offline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2178][gelly] Fixed groupReduceOnNeighbo...

2015-06-16 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/799#issuecomment-112373119
  
PR updated to test the same issue in ApplyCoGroupFunctionOnAllEdges and 
ApplyCoGroupFunction. 
Also added tests. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

2015-06-15 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/832#discussion_r32410253
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * This class provides simple utility methods for zipping elements in a 
file with an index.
+ *
+ * @param T The type of the DataSet, i.e., the type of the elements of 
the DataSet.
+ */
+public class DataSetUtilsT {
+
+   /**
+* Method that goes over all the elements in each partition in order to 
retireve
+* the total number of elements.
+*
+* @param input the DataSet received as input
+* @return a data set containing tuples of subtask index, number of 
elements mappings.
+*/
+   public DataSetTuple2Integer, Long countElements(DataSetT input) {
+   return input.mapPartition(new RichMapPartitionFunctionT, 
Tuple2Integer,Long() {
+   @Override
+   public void mapPartition(IterableT values, 
CollectorTuple2Integer, Long out) throws Exception {
+   long counter = 0;
+   for(T value: values) {
+   counter ++;
+   }
+
+   out.collect(new Tuple2Integer, 
Long(getRuntimeContext().getIndexOfThisSubtask(), counter));
+   }
+   });
+   }
+
+   /**
+* Method that takes a set of subtask index, total number of elements 
mappings
+* and assigns ids to all the elements from the input data set.
+*
+* @param input the input data set
+* @return a data set of tuple 2 consisting of consecutive ids and 
initial values.
+*/
+   public DataSetTuple2Long, T zipWithIndex(DataSetT input) {
--- End diff --

Yes, I thought about making the method static. However, if we move it to 
DataSet, we would need to call it from an instance. Would you do 
DataSet.zipWithIndex()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method

2015-06-15 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/818#discussion_r32415140
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
@@ -1151,6 +1151,23 @@ public void coGroup(IterableVertexK, VV vertex, 
IterableVertexK, VV vert
}
}
 
+
+   public GraphK, VV, EV removeVertices(DataSetVertexK, VV 
verticesToBeRemoved){
--- End diff --

Yes, this is definitely the idea, but right now you are duplicating a lot 
of code. Can we find a smarter way (i.e. that requires as little code 
duplication as possible) :)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method

2015-06-15 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/818#discussion_r32415176
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
@@ -1151,6 +1151,23 @@ public void coGroup(IterableVertexK, VV vertex, 
IterableVertexK, VV vert
}
}
 
+
--- End diff --

always add Javadoc to new methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method

2015-06-15 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/818#discussion_r32414981
  
--- Diff: docs/libs/gelly_guide.md ---
@@ -240,6 +240,7 @@ GraphLong, Double, Double networkWithWeights = 
network.joinWithEdgesOnSource(v
 img alt=Union Transformation width=50% src=fig/gelly-union.png/
 /p
 
+* strongDifference/strong: Gelly's `difference()` method performs a 
difference on the vertex and edge sets of the input graphs. The resultant graph 
is formed by removing the common vertices and edges from the graph.
--- End diff --

I think @vasia also wanted you to update the description for union ^^

Now, this still looks a bit unclear. It seems that there are two input 
graphs. You should make it obvious that the current graph gets differentiated 
with an input graph. That way, you won't leave room for comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method

2015-06-15 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/818#issuecomment-112099290
  
Hi @vasia ,

In essence the `difference` method is just a fancy way of removing 
vertices, right?
When you remove a vertex, you also remove the edge for which it was a 
source/target.

Since the add/remove vertices methods work just for lists and collect is 
unsafe, we mutually agreed to overload `removeVertices` to work for data 
sets. 
This way you would duplicate the least amount of code. Otherwise, you would 
take the exact code in the DataSet removeVertices and duplicate it in 
difference. That's not very practical IMO.

Also, it may occur that a user has a DataSet of elements to remove. An 
extra removeVertices won't really hurt then, would it? 

But if you have suggestions on how to improve this, we are more than eager 
to hear about them :)

-Andra


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2149][gelly] Simplified Jaccard Example

2015-06-14 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/770#issuecomment-111807874
  
PR updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2178][gelly] Fixed groupReduceOnNeighbo...

2015-06-13 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/799#issuecomment-111761198
  
Yup, exactly! The use case was: I modified something in the edge data set, 
called groupReduceOnNeighbors on the result and got NPE, exception that could 
have been avoided with this check, just like was the case for the Degree NPE. 

Making sure that the iterator is not null can save some people lots of 
headaches, IMO :) And it doesn't hurt anyone who has a correct data set. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex

2015-06-13 Thread andralungu
GitHub user andralungu opened a pull request:

https://github.com/apache/flink/pull/832

[FLINK-2152] Added zipWithIndex 

This PR adds the zipWithIndex utility method to Flink's DataSetUtils as 
described in the mailing list discussion: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/The-correct-location-for-zipWithIndex-and-zipWithUniqueId-td6310.html.
 

The method could, in the future, be moved to DataSet. 

@fhueske , @tillrohrmann , once we reach a conclusion for this one, I will 
also update #801 (I wouldn't like to fix unnecessary merge conflicts). 

Once zipWIthUniqueIds is added, I could also explain the difference in the 
docs. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andralungu/flink zipWithIndex

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/832.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #832


commit fdbf0167cc10e952faddc2a7d71e73e7f1f2d03f
Author: andralungu lungu.an...@gmail.com
Date:   2015-06-12T18:37:27Z

[FLINK-2152] zipWithIndex implementation

[FLINK-2152] Added zipWithIndex utility method




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >