Author: aching Date: Fri Sep 16 20:56:44 2011 New Revision: 1171776 URL: http://svn.apache.org/viewvc?rev=1171776&view=rev Log: GIRAPH-34: Failure of Vertex reflection for putVertexList from GIRAPH-27. (aching)
Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1171776&r1=1171775&r2=1171776&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Fri Sep 16 20:56:44 2011 @@ -2,6 +2,9 @@ Giraph Change Log Release 0.70.0 - unreleased + GIRAPH-34: Failure of Vertex reflection for putVertexList from + GIRAPH-27. (aching) + GIRAPH-35: Modifying the site to indicate that Jake Mannix and Dmitriy Ryaboy are now Giraph committers. (aching) Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1171776&r1=1171775&r2=1171776&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Fri Sep 16 20:56:44 2011 @@ -37,13 +37,13 @@ import java.util.TreeSet; import org.apache.log4j.Logger; import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.graph.BasicVertex; import org.apache.giraph.graph.BspUtils; -import org.apache.giraph.graph.VertexCombiner; import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.MutableVertex; -import org.apache.giraph.graph.BasicVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexCombiner; import org.apache.giraph.graph.VertexMutations; import org.apache.giraph.graph.VertexRange; import org.apache.giraph.graph.VertexResolver; @@ -423,6 +423,15 @@ public abstract class BasicRPCCommunicat InetSocketAddress addr, int numHandlers, String jobId, J jobToken) throws IOException; + /** + * Only constructor. + * + * @param context Context for getting configuration + * @param service Service worker to get the vertex ranges + * @throws IOException + * @throws UnknownHostException + * @throws InterruptedException + */ public BasicRPCCommunications(Mapper<?, ?, ?, ?>.Context context, CentralizedServiceWorker<I, V, E, M> service) throws IOException, UnknownHostException, InterruptedException { @@ -804,8 +813,8 @@ end[HADOOP_FACEBOOK]*/ public final void sendMessageReq(I destVertex, M msg) { InetSocketAddress addr = getInetSocketAddress(destVertex); if (LOG.isDebugEnabled()) { - LOG.debug("sendMessage: Send bytes (" + msg.toString() + ") to " + - destVertex + " with address " + addr); + LOG.debug("sendMessage: Send bytes (" + msg.toString() + + ") to " + destVertex + " with address " + addr); } ++totalMsgsSentInSuperstep; Map<I, MsgList<M>> msgMap = null; @@ -825,8 +834,10 @@ end[HADOOP_FACEBOOK]*/ msgMap.put(destVertex, msgList); } msgList.add(msg); - LOG.debug("sendMessage: added msg=" + msg + ", size=" + - msgList.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("sendMessage: added msg=" + msg + ", size=" + + msgList.size()); + } if (msgList.size() > maxSize) { peerThreads.get(addr).flushLargeMsgList(destVertex); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1171776&r1=1171775&r2=1171776&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Fri Sep 16 20:56:44 2011 @@ -124,12 +124,13 @@ public class SimpleCheckpointVertex exte System.out.println("compute: vertex " + getVertexId() + " sending edgeValue " + edgeValue + " vertexValue " + vertexValue + - " total " + (edgeValue.get() + (float) vertexValue) + + " total " + (edgeValue.get() + + (float) vertexValue) + " to vertex " + targetVertexId + " on superstep " + getSuperstep()); edgeValue.set(edgeValue.get() + (float) vertexValue); addEdge(targetVertexId, edgeValue); - sendMsg(targetVertexId, edgeValue); + sendMsg(targetVertexId, new FloatWritable(edgeValue.get())); } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1171776&r1=1171775&r2=1171776&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Fri Sep 16 20:56:44 2011 @@ -18,6 +18,8 @@ package org.apache.giraph.graph; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -37,9 +39,11 @@ import java.util.List; @SuppressWarnings("rawtypes") public abstract class BasicVertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> - implements AggregatorUsage, Iterable<I> { + implements AggregatorUsage, Iterable<I>, Configurable { /** Global graph state **/ private GraphState<I,V,E,M> graphState; + /** Configuration */ + private Configuration conf; /** * Optionally defined by the user to be executed once on all workers @@ -160,10 +164,12 @@ public abstract class BasicVertex<I exte public abstract int getNumOutEdges(); /** - * Send a message to a vertex id. + * Send a message to a vertex id. The message should not be mutated after + * this method returns or else undefined results could occur. * - * @param id vertex id to send the message to - * @param msg message data to send + * @param id Vertex id to send the message to + * @param msg Message data to send. Note that after the message is sent, + * the user should not modify the object. */ public void sendMsg(I id, M msg) { if (msg == null) { @@ -182,8 +188,8 @@ public abstract class BasicVertex<I exte /** * After this is called, the compute() code will no longer be called for * this vertice unless a message is sent to it. Then the compute() code - * will be called once again until this function is called. The application - * finishes only when all vertices vote to halt. + * will be called once again until this function is called. The + * application finishes only when all vertices vote to halt. */ public abstract void voteToHalt(); @@ -245,4 +251,14 @@ public abstract class BasicVertex<I exte return getGraphState().getGraphMapper().getAggregatorUsage(). useAggregator(name); } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1171776&r1=1171775&r2=1171776&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Fri Sep 16 20:56:44 2011 @@ -145,7 +145,7 @@ public abstract class Vertex<I extends W @Override public E removeEdge(I targetVertexId) { Edge<I, E> edge = destEdgeMap.remove(targetVertexId); - if(edge != null) { + if (edge != null) { return edge.getEdgeValue(); } else { return null; @@ -175,26 +175,23 @@ public abstract class Vertex<I extends W @Override final public void readFields(DataInput in) throws IOException { - vertexId = - BspUtils.<I>createVertexIndex(getContext().getConfiguration()); + vertexId = BspUtils.<I>createVertexIndex(getConf()); vertexId.readFields(in); boolean hasVertexValue = in.readBoolean(); if (hasVertexValue) { - vertexValue = - BspUtils.<V>createVertexValue(getContext().getConfiguration()); + vertexValue = BspUtils.<V>createVertexValue(getConf()); vertexValue.readFields(in); } long edgeMapSize = in.readLong(); for (long i = 0; i < edgeMapSize; ++i) { Edge<I, E> edge = new Edge<I, E>(); - edge.setConf(getContext().getConfiguration()); + edge.setConf(getConf()); edge.readFields(in); addEdge(edge.getDestVertexId(), edge.getEdgeValue()); } long msgListSize = in.readLong(); for (long i = 0; i < msgListSize; ++i) { - M msg = - BspUtils.<M>createMessageValue(getContext().getConfiguration()); + M msg = BspUtils.<M>createMessageValue(getConf()); msg.readFields(in); msgList.add(msg); } Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java?rev=1171776&r1=1171775&r2=1171776&view=diff ============================================================================== --- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java (original) +++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java Fri Sep 16 20:56:44 2011 @@ -71,7 +71,7 @@ public class TestVertexRangeBalancer ext removeAndSetOutput(job, outputPath); assertTrue(job.run(true)); FileSystem hdfs = FileSystem.get(job.getConfiguration()); - final int correctLen = 118; + final int correctLen = 123; if (getJobTracker() != null) { FileStatus [] fileStatusArr = hdfs.listStatus(outputPath); int totalLen = 0;