Author: aching Date: Tue Nov 15 22:52:54 2011 New Revision: 1202455 URL: http://svn.apache.org/viewvc?rev=1202455&view=rev Log: GIRAPH-88: Message count not updated properly after GIRAPH-11. (aching)
Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1202455&r1=1202454&r2=1202455&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Tue Nov 15 22:52:54 2011 @@ -2,6 +2,8 @@ Giraph Change Log Release 0.70.0 - unreleased + GIRAPH-88: Message count not updated properly after GIRAPH-11. (aching) + GIRAPH-70: Misspellings in PseudoRandomVertexInputFormat configuration parameters. (attilacsordas via jghoman) Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1202455&r1=1202454&r2=1202455&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Tue Nov 15 22:52:54 2011 @@ -106,11 +106,9 @@ public interface CentralizedServiceWorke * worker level statistics after the computation. * * @param partitionStatsList All the partition stats for this worker - * @param workersSentMessages Number of messages sent on this worker * @return true if this is the last superstep, false otherwise */ - boolean finishSuperstep(List<PartitionStats> partitionStatsList, - long workersSentMessages); + boolean finishSuperstep(List<PartitionStats> partitionStatsList); /** * Get the partition that a vertex index would belong to * Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1202455&r1=1202454&r2=1202455&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Nov 15 22:52:54 2011 @@ -548,7 +548,7 @@ public class BspServiceWorker< workerGraphPartitioner.finalizePartitionStats( partitionStatsList, workerPartitionMap); - finishSuperstep(partitionStatsList, 0); + finishSuperstep(partitionStatsList); } /** @@ -773,8 +773,7 @@ public class BspServiceWorker< } @Override - public boolean finishSuperstep(List<PartitionStats> partitionStatsList, - long workersSentMessages) { + public boolean finishSuperstep(List<PartitionStats> partitionStatsList) { // This barrier blocks until success (or the master signals it to // restart). // @@ -785,8 +784,9 @@ public class BspServiceWorker< // of this worker // 3. Let the master know it is finished. // 4. Then it waits for the master to say whether to stop or not. + long workerSentMessages = 0; try { - commService.flush(getContext()); + workerSentMessages = commService.flush(getContext()); } catch (IOException e) { throw new IllegalStateException( "finishSuperstep: flush failed", e); @@ -807,7 +807,7 @@ public class BspServiceWorker< workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY, Base64.encodeBytes(partitionStatsBytes)); workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, - workersSentMessages); + workerSentMessages); } catch (JSONException e) { throw new RuntimeException(e); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1202455&r1=1202454&r2=1202455&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Tue Nov 15 22:52:54 2011 @@ -512,7 +512,6 @@ public class GraphMapper<I extends Writa List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>(); - long workerSentMessages = 0; do { long superstep = serviceWorker.getSuperstep(); @@ -556,7 +555,6 @@ public class GraphMapper<I extends Writa context.progress(); partitionStatsList.clear(); - workerSentMessages = 0; for (Partition<I, V, E, M> partition : serviceWorker.getPartitionMap().values()) { PartitionStats partitionStats = @@ -593,8 +591,7 @@ public class GraphMapper<I extends Writa " maxMem=" + Runtime.getRuntime().maxMemory() + " freeMem=" + Runtime.getRuntime().freeMemory()); } - } while (!serviceWorker.finishSuperstep(partitionStatsList, - workerSentMessages)); + } while (!serviceWorker.finishSuperstep(partitionStatsList)); if (LOG.isInfoEnabled()) { LOG.info("map: BSP application done " + "(global vertices marked done)"); Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java?rev=1202455&r1=1202454&r2=1202455&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java Tue Nov 15 22:52:54 2011 @@ -32,8 +32,8 @@ import org.apache.hadoop.io.WritableComp * range partitioning is more susceptible to hot spots if the keys * are not randomly distributed. Another negative is the user must implement * some of the functionality around how to split the key range. - * - * Note: This implementation is incomplete, the developer must implement the + * + * Note: This implementation is incomplete, the developer must implement the * various methods based on their index type. * * @param <I> Vertex index value @@ -46,7 +46,7 @@ public abstract class RangeWorkerPartiti V extends Writable, E extends Writable, M extends Writable> implements WorkerGraphPartitioner<I, V, E, M> { /** Mapping of the vertex ids to the {@link PartitionOwner} */ - private NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap = + protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap = new TreeMap<I, RangePartitionOwner<I>>(); @Override