Author: aching Date: Sat Mar 17 16:57:56 2012 New Revision: 1301962 URL: http://svn.apache.org/viewvc?rev=1301962&view=rev Log: GIRAPH-154: Worker ports are not synched properly with its peers (Zhiwei Gu via aching).
Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1301962&r1=1301961&r2=1301962&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Sat Mar 17 16:57:56 2012 @@ -2,6 +2,9 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-154: Worker ports are not synched properly with its peers + (Zhiwei Gu via aching). + GIRAPH-87: Simplify boolean expression in BspService::checkpointFrequencyMet (Eli Reisman via aching). Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1301962&r1=1301961&r2=1301962&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Mar 17 16:57:56 2012 @@ -494,8 +494,20 @@ public abstract class BasicRPCCommunicat final int maxRpcPortBindAttempts = conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS, GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT); + final boolean failFirstPortBindingAttempt = + conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT, + GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT); while (bindAttempts < maxRpcPortBindAttempts) { this.myAddress = new InetSocketAddress(bindAddress, bindPort); + if (failFirstPortBindingAttempt && bindAttempts == 0) { + LOG.info("BasicRPCCommunications: Intentionally fail first " + + "binding attempt as giraph.failFirstRpcPortBindAttempt " + + "is true, port " + bindPort); + ++bindAttempts; + bindPort += portIncrementConstant; + continue; + } + try { this.server = getRPCServer( @@ -508,7 +520,7 @@ public abstract class BasicRPCCommunicat bindPort += portIncrementConstant; } } - if (bindAttempts == maxRpcPortBindAttempts) { + if (bindAttempts == maxRpcPortBindAttempts || this.server == null) { throw new IllegalStateException( "BasicRPCCommunications: Failed to start RPCServer with " + maxRpcPortBindAttempts + " attempts"); Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1301962&r1=1301961&r2=1301962&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Mar 17 16:57:56 2012 @@ -138,10 +138,6 @@ public class BspServiceWorker<I extends throws IOException, InterruptedException { super(serverPortList, sessionMsecTimeout, context, graphMapper); registerBspEvent(partitionExchangeChildrenChanged); - int finalRpcPort = - getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT, - GiraphJob.RPC_INITIAL_PORT_DEFAULT) + - getTaskPartition(); maxVerticesPerPartition = getConfiguration().getInt( GiraphJob.MAX_VERTICES_PER_PARTITION, @@ -150,12 +146,13 @@ public class BspServiceWorker<I extends getConfiguration().getLong( GiraphJob.INPUT_SPLIT_MAX_VERTICES, GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT); - workerInfo = - new WorkerInfo(getHostname(), getTaskPartition(), finalRpcPort); workerGraphPartitioner = getGraphPartitionerFactory().createWorkerGraphPartitioner(); - commService = new RPCCommunications<I, V, E, M>( - context, this, graphState); + RPCCommunications<I, V, E, M> rpcCommService = + new RPCCommunications<I, V, E, M>(context, this, graphState); + workerInfo = new WorkerInfo( + getHostname(), getTaskPartition(), rpcCommService.getPort()); + commService = rpcCommService; graphState.setWorkerCommunications(commService); this.workerContext = BspUtils.createWorkerContext(getConfiguration(), Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1301962&r1=1301961&r2=1301962&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Sat Mar 17 16:57:56 2012 @@ -160,6 +160,14 @@ public class GiraphJob extends Job { "giraph.maxRpcPortBindAttempts"; /** Default maximum bind attempts for different RPC ports */ public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20; + /** + * Fail first RPC port binding attempt, simulate binding failure + * on real grid testing + */ + public static final String FAIL_FIRST_RPC_PORT_BIND_ATTEMPT = + "giraph.failFirstRpcPortBindAttempt"; + /** Default fail first RPC port binding attempt flag */ + public static final boolean FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT = false; /** Maximum number of RPC handlers */ public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers"; Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java?rev=1301962&view=auto ============================================================================== --- incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java (added) +++ incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java Sat Mar 17 16:57:56 2012 @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import java.util.Map; +import java.util.Set; + +import junit.framework.TestCase; + +import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.utils.InternalVertexRunner; + +import com.google.common.base.Splitter; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; + +/** + * Tests for {@link TryMultiRpcBindingPortsTest} + */ +public class TryMultiRpcBindingPortsTest extends TestCase { + + /** + * A local integration test on toy data + */ + public void testToyData() throws Exception { + + // a small graph with three components + String[] graph = new String[] { + "1 2 3", + "2 1 4 5", + "3 1 4", + "4 2 3 5 13", + "5 2 4 12 13", + "12 5 13", + "13 4 5 12", + + "6 7 8", + "7 6 10 11", + "8 6 10", + "10 7 8 11", + "11 7 10", + + "9" }; + + // run internally + // fail the first port binding attempt + Map<String, String> params = Maps.<String, String>newHashMap(); + params.put(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT, "true"); + Iterable<String> results = InternalVertexRunner.run( + ConnectedComponentsVertex.class, + MinimumIntCombiner.class, + IntIntNullIntTextInputFormat.class, + VertexWithComponentTextOutputFormat.class, + params, + graph); + + SetMultimap<Integer,Integer> components = parseResults(results); + + Set<Integer> componentIDs = components.keySet(); + assertEquals(3, componentIDs.size()); + assertTrue(componentIDs.contains(1)); + assertTrue(componentIDs.contains(6)); + assertTrue(componentIDs.contains(9)); + + Set<Integer> componentOne = components.get(1); + assertEquals(7, componentOne.size()); + assertTrue(componentOne.contains(1)); + assertTrue(componentOne.contains(2)); + assertTrue(componentOne.contains(3)); + assertTrue(componentOne.contains(4)); + assertTrue(componentOne.contains(5)); + assertTrue(componentOne.contains(12)); + assertTrue(componentOne.contains(13)); + + Set<Integer> componentTwo = components.get(6); + assertEquals(5, componentTwo.size()); + assertTrue(componentTwo.contains(6)); + assertTrue(componentTwo.contains(7)); + assertTrue(componentTwo.contains(8)); + assertTrue(componentTwo.contains(10)); + assertTrue(componentTwo.contains(11)); + + Set<Integer> componentThree = components.get(9); + assertEquals(1, componentThree.size()); + assertTrue(componentThree.contains(9)); + } + + private SetMultimap<Integer,Integer> parseResults( + Iterable<String> results) { + SetMultimap<Integer,Integer> components = HashMultimap.create(); + for (String result : results) { + Iterable<String> parts = Splitter.on('\t').split(result); + int vertex = Integer.parseInt(Iterables.get(parts, 0)); + int component = Integer.parseInt(Iterables.get(parts, 1)); + components.put(component, vertex); + } + return components; + } +}