Author: aching Date: Mon Oct 10 07:38:15 2011 New Revision: 1180805 URL: http://svn.apache.org/viewvc?rev=1180805&view=rev Log: GIRAPH-48: numFlushThreads is 0 when doing a single worker unittest. Changing the minimum to 1. (aching)
Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1180805&r1=1180804&r2=1180805&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Mon Oct 10 07:38:15 2011 @@ -2,7 +2,10 @@ Giraph Change Log Release 0.70.0 - unreleased - GIRAPH-44. Add documentation about counter limits in Hadoop 0.203+. + GIRAPH-48: numFlushThreads is 0 when doing a single worker + unittest. Changing the minimum to 1. (aching) + + GIRAPH-44: Add documentation about counter limits in Hadoop 0.203+. (mtiwari via jghoman) GIRAPH-12: Investigate communication improvements. (hyunsik) Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1180805&r1=1180804&r2=1180805&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Mon Oct 10 07:38:15 2011 @@ -93,7 +93,7 @@ public abstract class BasicRPCCommunicat /** Messages sent during the last superstep */ private long totalMsgsSentInSuperstep = 0; /** - * Map of the peer connections, mapping from remote socket address to client + * Map of the peer connections, mapping from remote socket address to client * meta data */ private final Map<InetSocketAddress, PeerConnection> peerConnections = @@ -101,7 +101,7 @@ public abstract class BasicRPCCommunicat /** * Thread pool for message flush threads */ - private final ExecutorService executor; + private final ExecutorService executor; /** * Map of outbound messages, mapping from remote server to * destination vertex index to list of messages @@ -151,9 +151,9 @@ public abstract class BasicRPCCommunicat private final J jobToken; /** maximum number of vertices sent in a single RPC */ private static final int MAX_VERTICES_PER_RPC = 1024; - + /** - * PeerConnection contains RPC client and accumulated messages + * PeerConnection contains RPC client and accumulated messages * for a specific peer. */ private class PeerConnection { @@ -162,7 +162,7 @@ public abstract class BasicRPCCommunicat * mapping from vertex range (max vertex index) to list of messages. * (Synchronized with itself). */ - private final Map<I, MsgList<M>> outMessagesPerPeer; + private final Map<I, MsgList<M>> outMessagesPerPeer; /** * Client interface: RPC proxy for remote server, this class for local */ @@ -170,16 +170,16 @@ public abstract class BasicRPCCommunicat /** Maximum size of cached message list, before sending it out */ /** Boolean, set to false when local client (self), true otherwise */ private final boolean isProxy; - + public PeerConnection(Map<I, MsgList<M>> m, CommunicationsInterface<I, V, E, M> i, boolean isProxy) { - + this.outMessagesPerPeer = m; this.peer = i; this.isProxy = isProxy; } - + public void close() { if (LOG.isDebugEnabled()) { LOG.debug("close: Done"); @@ -190,7 +190,7 @@ public abstract class BasicRPCCommunicat return peer; } } - + private class PeerFlushExecutor implements Runnable { PeerConnection peerConnection; @@ -199,7 +199,7 @@ public abstract class BasicRPCCommunicat } @Override - public void run() { + public void run() { CommunicationsInterface<I, V, E, M> proxy = peerConnection.getRPCProxy(); @@ -223,7 +223,7 @@ public abstract class BasicRPCCommunicat proxy.getName() + " putting (list) " + msgList + " to " + e.getKey() + - ", proxy = " + + ", proxy = " + peerConnection.isProxy); } proxy.putMsgList(e.getKey(), msgList); @@ -236,7 +236,7 @@ public abstract class BasicRPCCommunicat + proxy.getName() + " putting " + msg + " to " + e.getKey() + - ", proxy = " + + ", proxy = " + peerConnection.isProxy); } if (msg == null) { @@ -257,7 +257,7 @@ public abstract class BasicRPCCommunicat ": all messages flushed"); } } catch (IOException e) { - LOG.error(e); + LOG.error(e); if (peerConnection.isProxy) { RPC.stopProxy(peerConnection.peer); } @@ -265,7 +265,7 @@ public abstract class BasicRPCCommunicat } } } - + /** * LargeMessageFlushExecutor flushes all outgoing messages destined to some vertices. * This is executed when the number of messages destined to certain vertex @@ -273,7 +273,7 @@ public abstract class BasicRPCCommunicat */ private class LargeMessageFlushExecutor implements Runnable { final I destVertex; - final MsgList<M> outMessage; + final MsgList<M> outMessage; PeerConnection peerConnection; LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) { @@ -286,11 +286,11 @@ public abstract class BasicRPCCommunicat } @Override - public void run() { + public void run() { try { - CommunicationsInterface<I, V, E, M> proxy = + CommunicationsInterface<I, V, E, M> proxy = peerConnection.getRPCProxy(); - + if (combiner != null) { M combinedMsg = combiner.combine(destVertex, outMessage); @@ -301,7 +301,7 @@ public abstract class BasicRPCCommunicat proxy.putMsgList(destVertex, outMessage); } } catch (IOException e) { - LOG.error(e); + LOG.error(e); if (peerConnection.isProxy) { RPC.stopProxy(peerConnection.peer); } @@ -311,7 +311,7 @@ public abstract class BasicRPCCommunicat } } } - + private void submitLargeMessageSend(InetSocketAddress addr, I destVertex) { PeerConnection pc = peerConnections.get(addr); executor.execute(new LargeMessageFlushExecutor(pc, destVertex)); @@ -367,20 +367,23 @@ public abstract class BasicRPCCommunicat this.server.start(); this.myName = myAddress.toString(); - + int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks); - // if the number of flush threads is unset, it is set to - // the number of max workers. - int numFlushThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS, numWorkers-1); + // If the number of flush threads is unset, it is set to + // the number of max workers - 1 or a minimum of 1. + int numFlushThreads = + Math.max(conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS, + numWorkers - 1), + 1); this.executor = Executors.newFixedThreadPool(numFlushThreads); - + if (LOG.isInfoEnabled()) { LOG.info("BasicRPCCommunications: Started RPC " + "communication server: " + myName + " with " + - numHandlers + " handlers and " + numFlushThreads + + numHandlers + " handlers and " + numFlushThreads + " flush threads"); } - + connectAllRPCProxys(this.jobId, this.jobToken); } @@ -649,7 +652,7 @@ end[HADOOP_FACEBOOK]*/ InetSocketAddress addr = getInetSocketAddress(vertexIndexMax); CommunicationsInterface<I, V, E, M> rpcProxy = peerConnections.get(addr).getRPCProxy(); - + if (LOG.isInfoEnabled()) { LOG.info("sendVertexList: Sending to " + rpcProxy.getName() + " " + addr + ", with vertex index " + vertexIndexMax + @@ -812,7 +815,7 @@ end[HADOOP_FACEBOOK]*/ Collections.shuffle(peerList); for (PeerConnection pc : peerList) { - futures.add(executor.submit(new PeerFlushExecutor(pc))); + futures.add(executor.submit(new PeerFlushExecutor(pc))); } // wait for all flushes