[FLINK-1201] [gelly] added label propagation in library

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de2aa578
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de2aa578
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de2aa578

Branch: refs/heads/master
Commit: de2aa5784af3fae4217fdca478ff8b8824b8be50
Parents: 8ff94da
Author: vasia <vasilikikala...@gmail.com>
Authored: Mon Jan 5 17:50:29 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:15 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/graph/GraphAlgorithm.java  |  5 ++
 .../flink/graph/library/LabelPropagation.java   | 82 ++++++++++++++++++++
 2 files changed, 87 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de2aa578/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index c2cbd71..2f5de95 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -3,6 +3,11 @@ package flink.graphs;
 
 import java.io.Serializable;
 
+/**
+ * @param <K> key type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
 public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV 
extends Serializable,
         EV extends Serializable> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de2aa578/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
new file mode 100644
index 0000000..430ccbb
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -0,0 +1,82 @@
+package flink.graphs.library;
+
+import flink.graphs.*;
+
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+import org.apache.flink.types.NullValue;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+@SuppressWarnings("serial")
+public class LabelPropagation<K extends Comparable<K> & Serializable> 
implements GraphAlgorithm<K, Long, NullValue> {
+
+    private final int maxIterations;
+
+    public LabelPropagation(int maxIterations) {
+        this.maxIterations = maxIterations;
+    }
+
+    @Override
+    public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
+
+       // iteratively adopt the most frequent label among the neighbors
+       // of each vertex
+       return input.runVertexCentricIteration(
+                new UpdateVertexLabel<K>(),
+                new SendNewLabelToNeighbors<K>(),
+                maxIterations
+        );
+    }
+
+    /**
+     * Function that updates the value of a vertex by adopting the most 
frequent label
+     * among its in-neighbors
+     */
+    public static final class UpdateVertexLabel<K extends Comparable<K> & 
Serializable>
+            extends VertexUpdateFunction<K, Long, Long> {
+
+               public void updateVertex(K vertexKey, Long vertexValue, 
MessageIterator<Long> inMessages) {
+                       Map<Long, Long> labelsWithFrequencies = new 
HashMap<Long, Long>();
+
+                       long maxFrequency = 1;
+                       long mostFrequentLabel = vertexValue;
+
+                       // store the labels with their frequencies
+                       for (Long msg : inMessages) {
+                               if (labelsWithFrequencies.containsKey(msg)) {
+                                       long currentFreq = 
labelsWithFrequencies.get(msg);
+                                       labelsWithFrequencies.put(msg, 
currentFreq + 1);
+                               }
+                               else {
+                                       labelsWithFrequencies.put(msg, 1L);
+                               }
+                       }
+                       // select the most frequent label
+                       for (Entry<Long, Long> entry : 
labelsWithFrequencies.entrySet()) {
+                               if (entry.getValue() > maxFrequency) {
+                                       maxFrequency = entry.getValue();
+                                       mostFrequentLabel = entry.getKey();
+                               }
+                       }
+
+                       // set the new vertex value
+                       setNewVertexValue(mostFrequentLabel);
+               }
+    }
+
+    /**
+     * Sends the vertex label to all out-neighbors
+     */
+    public static final class SendNewLabelToNeighbors<K extends Comparable<K> 
& Serializable>
+            extends MessagingFunction<K, Long, Long, NullValue> {
+
+       public void sendMessages(K vertexKey, Long newLabel) {
+            sendMessageToAllNeighbors(newLabel);
+        }
+    }
+}
\ No newline at end of file

Reply via email to