This is an automated email from the ASF dual-hosted git repository. sammichen pushed a commit to branch ozone-0.6.0 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 507bba57626fd37c51d50a8cb08784622d37e432 Author: runzhiwang <[email protected]> AuthorDate: Wed Jul 22 20:40:04 2020 +0800 HDDS-3933. Fix memory leak because of too many Datanode State Machine Thread (#1185) (cherry picked from commit ff7b5a3367eccc0969bfd92a2cafe48899a2aaa5) --- .../common/statemachine/DatanodeStateMachine.java | 25 +++++- .../common/statemachine/StateContext.java | 34 +++++++- .../states/datanode/RunningDatanodeState.java | 14 +++- .../common/statemachine/TestStateContext.java | 30 ++++++++ .../states/datanode/TestRunningDatanodeState.java | 90 ++++++++++++++++++++++ 5 files changed, 184 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 779b60a..27e814b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -19,11 +19,13 @@ package org.apache.hadoop.ozone.container.common.statemachine; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; @@ -50,7 +52,6 @@ import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.HadoopExecutors; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -103,9 +104,10 @@ public class DatanodeStateMachine implements Closeable { this.hddsDatanodeStopService = hddsDatanodeStopService; this.conf = conf; this.datanodeDetails = datanodeDetails; - executorService = HadoopExecutors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Datanode State Machine Thread - %d").build()); + executorService = Executors.newFixedThreadPool( + getEndPointTaskThreadPoolSize(), + new ThreadFactoryBuilder() + .setNameFormat("Datanode State Machine Task Thread - %d").build()); connectionManager = new SCMConnectionManager(conf); context = new StateContext(this.conf, DatanodeStates.getInitState(), this); // OzoneContainer instance is used in a non-thread safe way by the context @@ -155,6 +157,21 @@ public class DatanodeStateMachine implements Closeable { .build(); } + private int getEndPointTaskThreadPoolSize() { + // TODO(runzhiwang): current only support one recon, if support multiple + // recon in future reconServerCount should be the real number of recon + int reconServerCount = 1; + int totalServerCount = reconServerCount; + + try { + totalServerCount += HddsUtils.getSCMAddresses(conf).size(); + } catch (Exception e) { + LOG.error("Fail to get scm addresses", e); + } + + return totalServerCount; + } + /** * * Return DatanodeDetails if set, return null otherwise. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index f3a599d..51262c3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -35,6 +36,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; @@ -51,6 +53,8 @@ import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; import static java.lang.Math.min; import org.apache.commons.collections.CollectionUtils; + +import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval; import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +78,7 @@ public class StateContext { private DatanodeStateMachine.DatanodeStates state; private boolean shutdownOnError = false; private boolean shutdownGracefully = false; + private final AtomicLong threadPoolNotAvailableCount; /** * Starting with a 2 sec heartbeat frequency which will be updated to the @@ -103,6 +108,7 @@ public class StateContext { pipelineActions = new HashMap<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); + threadPoolNotAvailableCount = new AtomicLong(0); } /** @@ -393,6 +399,20 @@ public class StateContext { } } + @VisibleForTesting + public boolean isThreadPoolAvailable(ExecutorService executor) { + if (!(executor instanceof ThreadPoolExecutor)) { + return true; + } + + ThreadPoolExecutor ex = (ThreadPoolExecutor) executor; + if (ex.getQueue().size() == 0) { + return true; + } + + return false; + } + /** * Executes the required state function. * @@ -415,7 +435,19 @@ public class StateContext { if (this.isEntering()) { task.onEnter(); } - task.execute(service); + + if (isThreadPoolAvailable(service)) { + task.execute(service); + threadPoolNotAvailableCount.set(0); + } else { + if (threadPoolNotAvailableCount.get() + % getLogWarnInterval(conf) == 0) { + LOG.warn("No available thread in pool for past {} seconds.", + unit.toSeconds(time) * (threadPoolNotAvailableCount.get() + 1)); + } + threadPoolNotAvailableCount.incrementAndGet(); + } + DatanodeStateMachine.DatanodeStates newState = task.await(time, unit); if (this.state != newState) { if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java index 8a9bcaf..b0cfb4c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.container.common.states.datanode; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; @@ -42,7 +43,6 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Class that implements handshake with SCM. @@ -152,6 +152,11 @@ public class RunningDatanodeState implements DatanodeState { } } + @VisibleForTesting + public void setExecutorCompletionService(ExecutorCompletionService e) { + this.ecs = e; + } + private Callable<EndPointStates> getEndPointTask( EndpointStateMachine endpoint) { if (endpointTasks.containsKey(endpoint)) { @@ -200,10 +205,11 @@ public class RunningDatanodeState implements DatanodeState { @Override public DatanodeStateMachine.DatanodeStates await(long duration, TimeUnit timeUnit) - throws InterruptedException, ExecutionException, TimeoutException { + throws InterruptedException { int count = connectionManager.getValues().size(); int returned = 0; - long timeLeft = timeUnit.toMillis(duration); + long durationMS = timeUnit.toMillis(duration); + long timeLeft = durationMS; long startTime = Time.monotonicNow(); List<Future<EndPointStates>> results = new LinkedList<>(); @@ -214,7 +220,7 @@ public class RunningDatanodeState implements DatanodeState { results.add(result); returned++; } - timeLeft = timeLeft - (Time.monotonicNow() - startTime); + timeLeft = durationMS - (Time.monotonicNow() - startTime); } return computeNextContainerState(results); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index 545d670..c3fd310 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -28,6 +28,8 @@ import static org.mockito.Mockito.mock; import java.net.InetSocketAddress; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -39,6 +41,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates; import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.Assert; import org.junit.Test; import com.google.protobuf.GeneratedMessage; @@ -182,4 +186,30 @@ public class TestStateContext { assertEquals(DatanodeStates.SHUTDOWN, subject.getState()); } + @Test + public void testIsThreadPoolAvailable() throws Exception { + StateContext stateContext = new StateContext(null, null, null); + + int threadPoolSize = 2; + ExecutorService executorService = Executors.newFixedThreadPool( + threadPoolSize); + + CompletableFuture<String> futureOne = new CompletableFuture<>(); + CompletableFuture<String> futureTwo = new CompletableFuture<>(); + + // task num greater than pool size + for (int i = 0; i < threadPoolSize; i++) { + executorService.submit(() -> futureOne.get()); + } + executorService.submit(() -> futureTwo.get()); + + Assert.assertFalse(stateContext.isThreadPoolAvailable(executorService)); + + futureOne.complete("futureOne"); + LambdaTestUtils.await(1000, 100, () -> + stateContext.isThreadPoolAvailable(executorService)); + + futureTwo.complete("futureTwo"); + executorService.shutdown(); + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java new file mode 100644 index 0000000..9fb4307 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; + +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates.SHUTDOWN; +import static org.mockito.Mockito.when; + +/** + * Test class for RunningDatanodeState. + */ +public class TestRunningDatanodeState { + @Test + public void testAwait() throws InterruptedException { + SCMConnectionManager connectionManager = + Mockito.mock(SCMConnectionManager.class); + List<EndpointStateMachine> stateMachines = new ArrayList<>(); + when(connectionManager.getValues()).thenReturn(stateMachines); + + RunningDatanodeState state = + new RunningDatanodeState(null, connectionManager, null); + + int threadPoolSize = 2; + ExecutorService executorService = Executors.newFixedThreadPool( + threadPoolSize); + + ExecutorCompletionService ecs = + new ExecutorCompletionService<>(executorService); + state.setExecutorCompletionService(ecs); + + for (int i = 0; i < threadPoolSize; i++) { + stateMachines.add(new EndpointStateMachine(null, null, null)); + } + + CompletableFuture<EndpointStateMachine.EndPointStates> futureOne = + new CompletableFuture<>(); + for (int i = 0; i < threadPoolSize; i++) { + ecs.submit(() -> futureOne.get()); + } + + long startTime = Time.monotonicNow(); + state.await(500, TimeUnit.MILLISECONDS); + long endTime = Time.monotonicNow(); + Assert.assertTrue((endTime - startTime) >= 500); + + futureOne.complete(SHUTDOWN); + + CompletableFuture<EndpointStateMachine.EndPointStates> futureTwo = + new CompletableFuture<>(); + for (int i = 0; i < threadPoolSize; i++) { + ecs.submit(() -> futureTwo.get()); + } + futureTwo.complete(SHUTDOWN); + + startTime = Time.monotonicNow(); + state.await(500, TimeUnit.MILLISECONDS); + endTime = Time.monotonicNow(); + Assert.assertTrue((endTime - startTime) < 500); + + executorService.shutdown(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
