This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 4b256aa HDDS-2542. Race condition between read and write
stateMachineData. (#310)
4b256aa is described below
commit 4b256aa473e3bce0884f47985864afe2a9966c6a
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Jan 9 19:53:35 2020 +0530
HDDS-2542. Race condition between read and write stateMachineData. (#310)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 -
.../java/org/apache/hadoop/hdds/utils/Cache.java | 36 +++++
.../hadoop/hdds/utils/ResourceLimitCache.java | 91 +++++++++++
.../hadoop/hdds/utils/ResourceSemaphore.java | 170 +++++++++++++++++++++
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 9 --
.../common/src/main/resources/ozone-default.xml | 8 -
.../hadoop/hdds/utils/TestResourceLimitCache.java | 87 +++++++++++
.../hadoop/hdds/utils/TestResourceSemaphore.java | 76 +++++++++
.../server/ratis/ContainerStateMachine.java | 62 +++++---
.../transport/server/ratis/XceiverServerRatis.java | 14 +-
.../ozone/freon/TestOzoneClientKeyGenerator.java | 95 ++++++++++++
11 files changed, 594 insertions(+), 61 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 00d326e..737add0 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -115,13 +115,6 @@ public final class ScmConfigKeys {
public static final String
DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT = "1GB";
- // expiry interval stateMachineData cache entry inside containerStateMachine
- public static final String
- DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
- "dfs.container.ratis.statemachine.cache.expiry.interval";
- public static final String
- DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT =
- "10s";
public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
"dfs.ratis.client.request.timeout.duration";
public static final TimeDuration
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
new file mode 100644
index 0000000..efeb69f
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import java.util.function.Predicate;
+
+/**
+ * Cache interface.
+ */
+public interface Cache<K, V> {
+
+ V get(K key);
+
+ V put(K key, V value) throws InterruptedException;
+
+ V remove(K key);
+
+ void removeIf(Predicate<K> predicate);
+
+ void clear();
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
new file mode 100644
index 0000000..5dda249
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+/**
+ * Cache with resource limit constraints. At any time all entries in the cache
+ * satisfy the resource limit constraints in the constructor. New put
+ * operations are blocked until resources are released via remove or clear
+ * operation.
+ */
+public class ResourceLimitCache<K, V> implements Cache<K, V> {
+ private final java.util.concurrent.ConcurrentMap<K, V> map;
+ private final ResourceSemaphore.Group group;
+ private final BiFunction<K, V, int[]> permitsSupplier;
+
+ public ResourceLimitCache(java.util.concurrent.ConcurrentMap<K, V> map,
+ BiFunction<K, V, int[]> permitsSupplier, int... limits) {
+ Objects.requireNonNull(map);
+ Objects.requireNonNull(permitsSupplier);
+ Objects.requireNonNull(limits);
+ this.map = map;
+ this.group = new ResourceSemaphore.Group(limits);
+ this.permitsSupplier = permitsSupplier;
+ }
+
+ @Override
+ public V get(K key) {
+ Objects.requireNonNull(key);
+ return map.get(key);
+ }
+
+ @Override
+ public V put(K key, V value) throws InterruptedException {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(value);
+
+ // remove the old key to release the permits
+ V oldVal = remove(key);
+ int[] permits = permitsSupplier.apply(key, value);
+ group.acquire(permits);
+ try {
+ map.put(key, value);
+ } catch (Throwable t) {
+ group.release(permits);
+ }
+ return oldVal;
+ }
+
+ @Override
+ public V remove(K key) {
+ Objects.requireNonNull(key);
+ V val = map.remove(key);
+ if (val != null) {
+ group.release(permitsSupplier.apply(key, val));
+ }
+ return val;
+ }
+
+ @Override
+ public void removeIf(Predicate<K> predicate) {
+ Objects.requireNonNull(predicate);
+ map.keySet().removeIf(predicate);
+ }
+
+ @Override
+ public void clear() {
+ for (K key : map.keySet()) {
+ remove(key);
+ }
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
new file mode 100644
index 0000000..96d5996
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+
+import org.apache.ratis.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A {@link Semaphore} with a limit for a resource.
+ *
+ * After {@link #close()}, the resource becomes unavailable,
+ * i.e. any acquire will not succeed.
+ */
+public class ResourceSemaphore extends Semaphore {
+ private final int limit;
+ private final AtomicBoolean reducePermits = new AtomicBoolean();
+ private final AtomicBoolean isClosed = new AtomicBoolean();
+
+ public ResourceSemaphore(int limit) {
+ super(limit, true);
+ Preconditions.assertTrue(limit > 0, () -> "limit = " + limit + " <= 0");
+ this.limit = limit;
+ }
+
+ @Override
+ public void release() {
+ release(1);
+ }
+
+ @Override
+ public void release(int permits) {
+ assertRelease(permits);
+ super.release(permits);
+ assertAvailable();
+ }
+
+ private void assertRelease(int toRelease) {
+ Preconditions
+ .assertTrue(toRelease >= 0, () -> "toRelease = " + toRelease + " < 0");
+ final int available = assertAvailable();
+ final int permits = Math.addExact(available, toRelease);
+ Preconditions.assertTrue(permits <= limit,
+ () -> "permits = " + permits + " > limit = " + limit);
+ }
+
+ private int assertAvailable() {
+ final int available = availablePermits();
+ Preconditions
+ .assertTrue(available >= 0, () -> "available = " + available + " < 0");
+ return available;
+ }
+
+ public int used() {
+ return limit - availablePermits();
+ }
+
+ /** Close the resource. */
+ public void close() {
+ if (reducePermits.compareAndSet(false, true)) {
+ reducePermits(limit);
+ isClosed.set(true);
+ }
+ }
+
+ public boolean isClosed() {
+ return isClosed.get();
+ }
+
+ @Override
+ public String toString() {
+ return (isClosed()? "closed/": availablePermits() + "/") + limit;
+ }
+
+ /**
+ * Track a group of resources with a list of {@link ResourceSemaphore}s.
+ */
+ public static class Group {
+ private final List<ResourceSemaphore> resources;
+
+ public Group(int... limits) {
+ final List<ResourceSemaphore> list = new ArrayList<>(limits.length);
+ for(int limit : limits) {
+ list.add(new ResourceSemaphore(limit));
+ }
+ this.resources = Collections.unmodifiableList(list);
+ }
+
+ int resourceSize() {
+ return resources.size();
+ }
+
+ protected ResourceSemaphore get(int i) {
+ return resources.get(i);
+ }
+
+ boolean tryAcquire(int... permits) {
+ Preconditions.assertTrue(permits.length == resources.size(),
+ () -> "items.length = " + permits.length + " != resources.size() = "
+ + resources.size());
+ int i = 0;
+ // try acquiring all resources
+ for (; i < permits.length; i++) {
+ if (!resources.get(i).tryAcquire(permits[i])) {
+ break;
+ }
+ }
+ if (i == permits.length) {
+ return true; // successfully acquired all resources
+ }
+
+ // failed at i, releasing all previous resources
+ for(i--; i >= 0; i--) {
+ resources.get(i).release(permits[i]);
+ }
+ return false;
+ }
+
+ public void acquire(int... permits) throws InterruptedException {
+ Preconditions.assertTrue(permits.length == resources.size(),
+ () -> "items.length = " + permits.length + " != resources.size() = "
+ + resources.size());
+ for (int i = 0; i < permits.length; i++) {
+ resources.get(i).acquire(permits[i]);
+ }
+ }
+
+ protected void release(int... permits) {
+ for(int i = resources.size() - 1; i >= 0; i--) {
+ resources.get(i).release(permits[i]);
+ }
+ }
+
+ public void close() {
+ for(int i = resources.size() - 1; i >= 0; i--) {
+ resources.get(i).close();
+ }
+ }
+
+ public boolean isClosed() {
+ return resources.get(resources.size() - 1).isClosed();
+ }
+
+ @Override
+ public String toString() {
+ return resources + ",size=" + resources.size();
+ }
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 1700926..e637a09 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -267,15 +267,6 @@ public final class OzoneConfigKeys {
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT;
- public static final String
- DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
- ScmConfigKeys.
- DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL;
- public static final String
- DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT =
- ScmConfigKeys.
- DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT;
-
public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
"dfs.container.ratis.datanode.storage.dir";
public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c83a5bf..8e8ec36 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -253,14 +253,6 @@
</description>
</property>
<property>
- <name>dfs.container.ratis.statemachine.cache.expiry.interval</name>
- <value>10s</value>
- <tag>OZONE, RATIS, PERFORMANCE</tag>
- <description>The interval till which the stateMachine data in ratis
- will be cached inside the ContainerStateMachine.
- </description>
- </property>
- <property>
<name>dfs.ratis.client.request.timeout.duration</name>
<value>3s</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
new file mode 100644
index 0000000..34d6b3c
--- /dev/null
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test for ResourceLimitCache.
+ */
+public class TestResourceLimitCache {
+
+ @Test
+ public void testResourceLimitCache()
+ throws InterruptedException, TimeoutException {
+ Cache<Integer, String> resourceCache =
+ new ResourceLimitCache<>(new ConcurrentHashMap<>(),
+ (k, v) -> new int[] {k}, 10);
+ resourceCache.put(6, "a");
+ resourceCache.put(4, "a");
+
+ // put should pass as key 4 will be overwritten
+ resourceCache.put(4, "a");
+
+ // Create a future which blocks to put 1. Currently map has acquired 10
+ // permits out of 10
+ CompletableFuture future = CompletableFuture.supplyAsync(() -> {
+ try {
+ return resourceCache.put(1, "a");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
+ });
+ Assert.assertTrue(!future.isDone());
+ Thread.sleep(100);
+ Assert.assertTrue(!future.isDone());
+
+ // remove 4 so that permits are released for key 1 to be put. Currently map
+ // has acquired 6 permits out of 10
+ resourceCache.remove(4);
+
+ GenericTestUtils.waitFor(future::isDone, 100, 1000);
+ // map has the ket 1
+ Assert.assertTrue(future.isDone() && !future.isCompletedExceptionally());
+ Assert.assertNotNull(resourceCache.get(1));
+
+ // Create a future which blocks to put 4. Currently map has acquired 7
+ // permits out of 10
+ future = CompletableFuture.supplyAsync(() -> {
+ try {
+ return resourceCache.put(4, "a");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
+ });
+ Assert.assertTrue(!future.isDone());
+ Thread.sleep(100);
+ Assert.assertTrue(!future.isDone());
+
+ // Cancel the future for putting key 4
+ future.cancel(true);
+ // remove key 1 so currently map has acquired 6 permits out of 10
+ resourceCache.remove(1);
+ Assert.assertNull(resourceCache.get(4));
+ }
+}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
new file mode 100644
index 0000000..cbdd558
--- /dev/null
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for ResourceSemaphore.
+ */
+public class TestResourceSemaphore {
+ @Test(timeout = 1000)
+ public void testGroup() {
+ final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1);
+
+ assertUsed(g, 0, 0);
+ assertAcquire(g, true, 1, 1);
+ assertUsed(g, 1, 1);
+ assertAcquire(g, false, 1, 1);
+ assertUsed(g, 1, 1);
+ assertAcquire(g, false, 0, 1);
+ assertUsed(g, 1, 1);
+ assertAcquire(g, true, 1, 0);
+ assertUsed(g, 2, 1);
+ assertAcquire(g, true, 1, 0);
+ assertUsed(g, 3, 1);
+ assertAcquire(g, false, 1, 0);
+ assertUsed(g, 3, 1);
+
+ g.release(1, 1);
+ assertUsed(g, 2, 0);
+ g.release(2, 0);
+ assertUsed(g, 0, 0);
+ g.release(0, 0);
+ assertUsed(g, 0, 0);
+
+ try {
+ g.release(1, 0);
+ Assert.fail("Should have failed.");
+ } catch (IllegalStateException e){
+ }
+ try {
+ g.release(0, 1);
+ Assert.fail("Should have failed.");
+ } catch (IllegalStateException e){
+ }
+ }
+
+ static void assertUsed(ResourceSemaphore.Group g, int... expected) {
+ Assert.assertEquals(expected.length, g.resourceSize());
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertEquals(expected[i], g.get(i).used());
+ }
+ }
+
+ static void assertAcquire(ResourceSemaphore.Group g, boolean expected,
+ int... permits) {
+ final boolean computed = g.tryAcquire(permits);
+ Assert.assertEquals(expected, computed);
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 4aacedc..dbf376f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -20,9 +20,8 @@ package
org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -30,6 +29,8 @@ import
org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.Cache;
+import org.apache.hadoop.hdds.utils.ResourceLimitCache;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.util.Time;
@@ -75,7 +76,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -83,8 +83,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.concurrent.Executors;
@@ -158,7 +156,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
@SuppressWarnings("parameternumber")
public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
ContainerController containerController, ThreadPoolExecutor
chunkExecutor,
- XceiverServerRatis ratisServer, long expiryInterval, Configuration conf)
{
+ XceiverServerRatis ratisServer, Configuration conf) {
this.gid = gid;
this.dispatcher = dispatcher;
this.containerController = containerController;
@@ -167,11 +165,17 @@ public class ContainerStateMachine extends
BaseStateMachine {
metrics = CSMMetrics.create(gid);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
applyTransactionCompletionMap = new ConcurrentHashMap<>();
- stateMachineDataCache = CacheBuilder.newBuilder()
- .expireAfterAccess(expiryInterval, TimeUnit.MILLISECONDS)
- // set the limit on no of cached entries equal to no of max threads
- // executing writeStateMachineData
- .maximumSize(chunkExecutor.getCorePoolSize()).build();
+ int numPendingRequests = conf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_NUM_PENDING_REQUESTS,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_NUM_PENDING_REQUESTS_DEFAULT
+ );
+ int pendingRequestsByteLimit = (int) conf.getStorageSize(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT,
+ StorageUnit.BYTES);
+ stateMachineDataCache = new ResourceLimitCache<>(new ConcurrentHashMap<>(),
+ (index, data) -> new int[] {1, data.size()}, numPendingRequests,
+ pendingRequestsByteLimit);
this.container2BCSIDMap = new ConcurrentHashMap<>();
final int numContainerOpExecutors = conf.getInt(
@@ -417,7 +421,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
if (((RaftServerProxy) server).getImpl(gid).isLeader()) {
stateMachineDataCache.put(entryIndex, write.getData());
}
- } catch (IOException ioe) {
+ } catch (IOException | InterruptedException ioe) {
return completeExceptionally(ioe);
}
DispatcherContext context =
@@ -466,6 +470,9 @@ public class ContainerStateMachine extends BaseStateMachine
{
write.getChunkData().getChunkName() + " Error message: " +
r.getMessage() + " Container Result: " + r.getResult());
metrics.incNumWriteDataFails();
+ // If the write fails currently we mark the stateMachine as unhealthy.
+ // This leads to pipeline close. Any change in that behavior requires
+ // handling the entry for the write chunk in cache.
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(sce);
} else {
@@ -589,9 +596,13 @@ public class ContainerStateMachine extends
BaseStateMachine {
* Reads the Entry from the Cache or loads it back by reading from disk.
*/
private ByteString getCachedStateMachineData(Long logIndex, long term,
- ContainerCommandRequestProto requestProto) throws ExecutionException {
- return stateMachineDataCache.get(logIndex,
- () -> readStateMachineData(requestProto, term, logIndex));
+ ContainerCommandRequestProto requestProto)
+ throws IOException {
+ ByteString data = stateMachineDataCache.get(logIndex);
+ if (data == null) {
+ data = readStateMachineData(requestProto, term, logIndex);
+ }
+ return data;
}
/**
@@ -637,7 +648,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
future.complete(
getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
requestProto));
- } catch (ExecutionException e) {
+ } catch (IOException e) {
metrics.incNumReadStateMachineFails();
future.completeExceptionally(e);
}
@@ -695,6 +706,10 @@ public class ContainerStateMachine extends
BaseStateMachine {
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
long index = trx.getLogEntry().getIndex();
+ // Since leader and one of the followers has written the data, it can
+ // be removed from the stateMachineDataMap.
+ stateMachineDataCache.remove(index);
+
DispatcherContext.Builder builder =
new DispatcherContext.Builder()
.setTerm(trx.getLogEntry().getTerm())
@@ -803,10 +818,15 @@ public class ContainerStateMachine extends
BaseStateMachine {
return future;
}
+ @Override
+ public CompletableFuture<Void> truncateStateMachineData(long index) {
+ stateMachineDataCache.removeIf(k -> k >= index);
+ return CompletableFuture.completedFuture(null);
+ }
+
@VisibleForTesting
public void evictStateMachineCache() {
- stateMachineDataCache.invalidateAll();
- stateMachineDataCache.cleanUp();
+ stateMachineDataCache.clear();
}
@Override
@@ -820,12 +840,6 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
@Override
- public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
- throws IOException {
- evictStateMachineCache();
- }
-
- @Override
public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
ratisServer.handleNodeLogFailure(gid, t);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 753d8f5..17a3892 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -57,7 +57,6 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.util.SizeInBytes;
@@ -102,9 +101,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
private final ContainerController containerController;
private ClientId clientId = ClientId.randomId();
private final StateContext context;
- private final ReplicationLevel replicationLevel;
private long nodeFailureTimeoutMs;
- private final long cacheEntryExpiryInteval;
private boolean isStarted = false;
private DatanodeDetails datanodeDetails;
private final OzoneConfiguration conf;
@@ -138,14 +135,6 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
new ArrayBlockingQueue<>(queueLimit),
new ThreadPoolExecutor.CallerRunsPolicy());
this.context = context;
- this.replicationLevel =
- conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
- OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
- cacheEntryExpiryInteval = conf.getTimeDuration(OzoneConfigKeys.
- DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL,
- OzoneConfigKeys.
- DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT,
- TimeUnit.MILLISECONDS);
this.dispatcher = dispatcher;
this.containerController = containerController;
this.raftPeerId = RatisHelper.toRaftPeerId(dd);
@@ -162,8 +151,7 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
private ContainerStateMachine getStateMachine(RaftGroupId gid) {
return new ContainerStateMachine(gid, dispatcher, containerController,
- chunkExecutor, this, cacheEntryExpiryInteval,
- conf);
+ chunkExecutor, this, conf);
}
private RaftProperties newRaftProperties() {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
new file mode 100644
index 0000000..315d1ee
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Test for OzoneClientKeyGenerator.
+ */
+public class TestOzoneClientKeyGenerator {
+
+ private String path;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ *
+ * @throws IOException
+ */
+ @Before
+ public void setup() {
+ path = GenericTestUtils
+ .getTempPath(TestOzoneClientKeyGenerator.class.getSimpleName());
+ GenericTestUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ File baseDir = new File(path);
+ baseDir.mkdirs();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ private void shutdown(MiniOzoneCluster cluster) throws IOException {
+ if (cluster != null) {
+ cluster.shutdown();
+ FileUtils.deleteDirectory(new File(path));
+ }
+ }
+
+ private MiniOzoneCluster startCluster(OzoneConfiguration conf)
+ throws Exception {
+ if (conf == null) {
+ conf = new OzoneConfiguration();
+ }
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+ MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(5)
+ .build();
+
+ cluster.waitForClusterToBeReady();
+ cluster.waitTobeOutOfSafeMode();
+ return cluster;
+ }
+
+ @Test
+ public void testOzoneClientKeyGenerator() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ MiniOzoneCluster cluster = startCluster(conf);
+ FileOutputStream out = FileUtils.openOutputStream(new File(path, "conf"));
+ cluster.getConf().writeXml(out);
+ out.getFD().sync();
+ out.close();
+ new Freon().execute(
+ new String[] {"-conf", new File(path, "conf").getAbsolutePath(),
+ "ockg", "-t", "1"});
+ shutdown(cluster);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]