Author: aching
Date: Tue Nov 15 22:52:54 2011
New Revision: 1202455

URL: http://svn.apache.org/viewvc?rev=1202455&view=rev
Log:
GIRAPH-88: Message count not updated properly after GIRAPH-11. (aching)


Modified:
    incubator/giraph/trunk/CHANGELOG
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Tue Nov 15 22:52:54 2011
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-88: Message count not updated properly after GIRAPH-11. (aching)
+
   GIRAPH-70: Misspellings in PseudoRandomVertexInputFormat configuration
   parameters. (attilacsordas via jghoman)
 

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
 Tue Nov 15 22:52:54 2011
@@ -106,11 +106,9 @@ public interface CentralizedServiceWorke
      * worker level statistics after the computation.
      *
      * @param partitionStatsList All the partition stats for this worker
-     * @param workersSentMessages Number of messages sent on this worker
      * @return true if this is the last superstep, false otherwise
      */
-    boolean finishSuperstep(List<PartitionStats> partitionStatsList,
-                            long workersSentMessages);
+    boolean finishSuperstep(List<PartitionStats> partitionStatsList);
     /**
      * Get the partition that a vertex index would belong to
      *

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
 Tue Nov 15 22:52:54 2011
@@ -548,7 +548,7 @@ public class BspServiceWorker<
         workerGraphPartitioner.finalizePartitionStats(
             partitionStatsList, workerPartitionMap);
 
-        finishSuperstep(partitionStatsList, 0);
+        finishSuperstep(partitionStatsList);
     }
 
     /**
@@ -773,8 +773,7 @@ public class BspServiceWorker<
     }
 
     @Override
-    public boolean finishSuperstep(List<PartitionStats> partitionStatsList,
-                                   long workersSentMessages) {
+    public boolean finishSuperstep(List<PartitionStats> partitionStatsList) {
         // This barrier blocks until success (or the master signals it to
         // restart).
         //
@@ -785,8 +784,9 @@ public class BspServiceWorker<
         // of this worker
         // 3. Let the master know it is finished.
         // 4. Then it waits for the master to say whether to stop or not.
+        long workerSentMessages = 0;
         try {
-            commService.flush(getContext());
+            workerSentMessages = commService.flush(getContext());
         } catch (IOException e) {
             throw new IllegalStateException(
                 "finishSuperstep: flush failed", e);
@@ -807,7 +807,7 @@ public class BspServiceWorker<
             workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
                                       Base64.encodeBytes(partitionStatsBytes));
             workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
-                                      workersSentMessages);
+                                      workerSentMessages);
         } catch (JSONException e) {
             throw new RuntimeException(e);
         }

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java 
(original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java 
Tue Nov 15 22:52:54 2011
@@ -512,7 +512,6 @@ public class GraphMapper<I extends Writa
 
         List<PartitionStats> partitionStatsList =
             new ArrayList<PartitionStats>();
-        long workerSentMessages = 0;
         do {
             long superstep = serviceWorker.getSuperstep();
 
@@ -556,7 +555,6 @@ public class GraphMapper<I extends Writa
             context.progress();
 
             partitionStatsList.clear();
-            workerSentMessages = 0;
             for (Partition<I, V, E, M> partition :
                     serviceWorker.getPartitionMap().values()) {
                 PartitionStats partitionStats =
@@ -593,8 +591,7 @@ public class GraphMapper<I extends Writa
                          " maxMem=" + Runtime.getRuntime().maxMemory() +
                          " freeMem=" + Runtime.getRuntime().freeMemory());
             }
-        } while (!serviceWorker.finishSuperstep(partitionStatsList,
-                                                workerSentMessages));
+        } while (!serviceWorker.finishSuperstep(partitionStatsList));
         if (LOG.isInfoEnabled()) {
             LOG.info("map: BSP application done " +
                      "(global vertices marked done)");

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java?rev=1202455&r1=1202454&r2=1202455&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
 Tue Nov 15 22:52:54 2011
@@ -32,8 +32,8 @@ import org.apache.hadoop.io.WritableComp
  * range partitioning is more susceptible to hot spots if the keys
  * are not randomly distributed.  Another negative is the user must implement
  * some of the functionality around how to split the key range.
- * 
- * Note:  This implementation is incomplete, the developer must implement the 
+ *
+ * Note:  This implementation is incomplete, the developer must implement the
  * various methods based on their index type.
  *
  * @param <I> Vertex index value
@@ -46,7 +46,7 @@ public abstract class RangeWorkerPartiti
         V extends Writable, E extends Writable, M extends Writable> implements
         WorkerGraphPartitioner<I, V, E, M> {
     /** Mapping of the vertex ids to the {@link PartitionOwner} */
-    private NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
+    protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
         new TreeMap<I, RangePartitionOwner<I>>();
 
     @Override


Reply via email to