keith-turner closed pull request #645: Add space aware volume chooser
URL: https://github.com/apache/accumulo/pull/645
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
 
b/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
new file mode 100644
index 0000000000..2982459d90
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * A {@link PreferredVolumeChooser} that takes remaining HDFS space into 
account when making a
+ * volume choice rather than a simpler round robin. The list of volumes to use 
can be limited using
+ * the same properties as {@link PreferredVolumeChooser}
+ */
+public class SpaceAwareVolumeChooser extends PreferredVolumeChooser {
+
+  public static final String HDFS_SPACE_RECOMPUTE_INTERVAL = 
Property.GENERAL_ARBITRARY_PROP_PREFIX
+      .getKey() + "spaceaware.volume.chooser.recompute.interval";
+
+  // Default time to wait in ms. Defaults to 5 min
+  private long defaultComputationCacheDuration = 300000;
+  LoadingCache<List<String>,WeightedRandomCollection> choiceCache = null;
+
+  private static final Logger log = 
LoggerFactory.getLogger(SpaceAwareVolumeChooser.class);
+
+  @Override
+  public String choose(VolumeChooserEnvironment env, String[] options)
+      throws VolumeChooserException {
+
+    options = getPreferredVolumes(env, options);
+
+    try {
+      return getCache(env).get(Arrays.asList(options)).next();
+    } catch (ExecutionException e) {
+      throw new IllegalStateException("Execution exception when attempting to 
cache choice", e);
+    }
+  }
+
+  private synchronized LoadingCache<List<String>,WeightedRandomCollection> 
getCache(
+      VolumeChooserEnvironment env) {
+
+    if (choiceCache == null) {
+      ServerConfigurationFactory scf = loadConfFactory(env);
+      AccumuloConfiguration systemConfiguration = scf.getSystemConfiguration();
+      String propertyValue = 
systemConfiguration.get(HDFS_SPACE_RECOMPUTE_INTERVAL);
+
+      long computationCacheDuration = StringUtils.isNotBlank(propertyValue)
+          ? Long.parseLong(propertyValue)
+          : defaultComputationCacheDuration;
+
+      choiceCache = CacheBuilder.newBuilder()
+          .expireAfterWrite(computationCacheDuration, TimeUnit.MILLISECONDS)
+          .build(new CacheLoader<List<String>,WeightedRandomCollection>() {
+            public WeightedRandomCollection load(List<String> key) {
+              return new WeightedRandomCollection(key, env);
+            }
+          });
+    }
+
+    return choiceCache;
+  }
+
+  public class WeightedRandomCollection {
+    private final NavigableMap<Double,String> map = new 
TreeMap<Double,String>();
+    private final Random random;
+    private double total = 0;
+
+    public WeightedRandomCollection(List<String> options, 
VolumeChooserEnvironment env) {
+      this.random = new Random();
+
+      if (options.size() < 1) {
+        throw new IllegalStateException("Options was empty! No valid volumes 
to choose from.");
+      }
+
+      VolumeManager manager = env.getServerContext().getVolumeManager();
+
+      // Compute percentage space available on each volume
+      for (String option : options) {
+        FileSystem pathFs = manager.getVolumeByPath(new 
Path(option)).getFileSystem();
+        try {
+          FsStatus optionStatus = pathFs.getStatus();
+          double percentFree = ((double) optionStatus.getRemaining() / 
optionStatus.getCapacity());
+          add(percentFree, option);
+        } catch (IOException e) {
+          log.error("Unable to get file system status for" + option, e);
+        }
+      }
+
+      if (map.size() < 1) {
+        throw new IllegalStateException(
+            "Weighted options was empty! Could indicate an issue getting file 
system status or "
+                + "no free space on any volume");
+      }
+    }
+
+    public WeightedRandomCollection add(double weight, String result) {
+      if (weight <= 0) {
+        log.info("Weight was 0. Not adding " + result);
+        return this;
+      }
+      total += weight;
+      map.put(total, result);
+      return this;
+    }
+
+    public String next() {
+      double value = random.nextDouble() * total;
+      return map.higherEntry(value).getValue();
+    }
+  }
+}
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
new file mode 100644
index 0000000000..1b4f032744
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+public class SpaceAwareVolumeChooserTest {
+  VolumeManager volumeManager = null;
+  VolumeChooserEnvironment chooserEnv = null;
+  ServerContext serverContext = null;
+  ServerConfigurationFactory serverConfigurationFactory = null;
+  AccumuloConfiguration sysConfig = null;
+  Volume vol1 = null;
+  Volume vol2 = null;
+  FileSystem fs1 = null;
+  FileSystem fs2 = null;
+  FsStatus status1 = null;
+  FsStatus status2 = null;
+
+  int iterations = 1000;
+
+  String volumeOne = "hdfs://nn1:8020/apps/accumulo1/tables";
+  String volumeTwo = "hdfs://nn2:8020/applications/accumulo/tables";
+
+  // Different volumes with different paths
+  String[] tableDirs = {volumeOne, volumeTwo};
+
+  int vol1Count = 0;
+  int vol2Count = 0;
+
+  @Before
+  public void beforeTest() {
+    volumeManager = EasyMock.createMock(VolumeManager.class);
+    serverContext = EasyMock.createMock(ServerContext.class);
+    serverConfigurationFactory = 
EasyMock.createMock(ServerConfigurationFactory.class);
+    sysConfig = EasyMock.createMock(AccumuloConfiguration.class);
+    vol1 = EasyMock.createMock(Volume.class);
+    vol2 = EasyMock.createMock(Volume.class);
+    fs1 = EasyMock.createMock(FileSystem.class);
+    fs2 = EasyMock.createMock(FileSystem.class);
+    status1 = EasyMock.createMock(FsStatus.class);
+    status2 = EasyMock.createMock(FsStatus.class);
+    chooserEnv = new 
VolumeChooserEnvironment(VolumeChooserEnvironment.ChooserScope.DEFAULT,
+        serverContext);
+
+  }
+
+  private void testSpecificSetup(long percentage1, long percentage2, String 
cacheDuration,
+      int timesToCallPreferredVolumeChooser, boolean anyTimes) throws 
IOException {
+    int max = iterations + 1;
+    int min = 1;
+    int updatePropertyMax = timesToCallPreferredVolumeChooser + iterations;
+    if (anyTimes) {
+      max = iterations + 1;
+      updatePropertyMax = max + 1;
+    }
+    // Volume 1 is percentage1 full
+    EasyMock.expect(status1.getRemaining()).andReturn(percentage1).times(min, 
max);
+    EasyMock.expect(status1.getCapacity()).andReturn(100L).times(min, max);
+
+    // Volume 2 is percentage2 full
+    EasyMock.expect(status2.getRemaining()).andReturn(percentage2).times(min, 
max);
+    EasyMock.expect(status2.getCapacity()).andReturn(100L).times(min, max);
+
+    
EasyMock.expect(sysConfig.get(SpaceAwareVolumeChooser.HDFS_SPACE_RECOMPUTE_INTERVAL))
+        .andReturn(cacheDuration).times(1);
+    EasyMock
+        .expect(sysConfig.get(PreferredVolumeChooser
+            
.getPropertyNameForScope(VolumeChooserEnvironment.ChooserScope.DEFAULT)))
+        .andReturn(String.join(",", 
tableDirs)).times(timesToCallPreferredVolumeChooser);
+
+    
EasyMock.expect(serverContext.getVolumeManager()).andReturn(volumeManager).times(min,
+        Math.max(max, updatePropertyMax));
+    
EasyMock.expect(serverContext.getServerConfFactory()).andReturn(serverConfigurationFactory)
+        .times(min, updatePropertyMax);
+    
EasyMock.expect(serverConfigurationFactory.getSystemConfiguration()).andReturn(sysConfig)
+        .times(1, updatePropertyMax);
+
+    EasyMock.expect(volumeManager.getVolumeByPath(new 
Path(volumeOne))).andReturn(vol1).times(min,
+        max);
+    EasyMock.expect(volumeManager.getVolumeByPath(new 
Path(volumeTwo))).andReturn(vol2).times(min,
+        max);
+    EasyMock.expect(vol1.getFileSystem()).andReturn(fs1).times(min, max);
+    EasyMock.expect(vol2.getFileSystem()).andReturn(fs2).times(min, max);
+    EasyMock.expect(fs1.getStatus()).andReturn(status1).times(min, max);
+    EasyMock.expect(fs2.getStatus()).andReturn(status2).times(min, max);
+
+    EasyMock.replay(serverContext, vol1, vol2, fs1, fs2, status1, status2, 
volumeManager,
+        serverConfigurationFactory, sysConfig);
+  }
+
+  @After
+  public void afterTest() {
+
+    EasyMock.verify(serverContext, vol1, vol2, fs1, fs2, status1, status2, 
volumeManager,
+        serverConfigurationFactory, sysConfig);
+
+    volumeManager = null;
+    serverContext = null;
+    vol1 = null;
+    vol2 = null;
+    fs1 = null;
+    fs2 = null;
+    status1 = null;
+    status2 = null;
+    vol1Count = 0;
+    vol2Count = 0;
+  }
+
+  @Test
+  public void testEvenWeightsWithCaching() throws IOException {
+
+    testSpecificSetup(10L, 10L, null, iterations, false);
+
+    makeChoices();
+
+    assertEquals(iterations / 2, vol1Count, iterations / 10);
+    assertEquals(iterations / 2, vol2Count, iterations / 10);
+
+  }
+
+  @Test
+  public void testEvenWeightsNoCaching() throws IOException {
+
+    testSpecificSetup(10L, 10L, "0", iterations, true);
+
+    makeChoices();
+
+    assertEquals(iterations / 2, vol1Count, iterations / 10);
+    assertEquals(iterations / 2, vol2Count, iterations / 10);
+
+  }
+
+  @Test(expected = UncheckedExecutionException.class)
+  public void testNoFreeSpace() throws IOException {
+
+    testSpecificSetup(0L, 0L, null, 1, false);
+
+    makeChoices();
+  }
+
+  @Test
+  public void testNinetyTen() throws IOException {
+
+    testSpecificSetup(90L, 10L, null, iterations, false);
+
+    makeChoices();
+
+    assertEquals(iterations * .9, vol1Count, iterations / 10);
+    assertEquals(iterations * .1, vol2Count, iterations / 10);
+
+  }
+
+  @Test
+  public void testTenNinety() throws IOException {
+
+    testSpecificSetup(10L, 90L, null, iterations, false);
+
+    makeChoices();
+
+    assertEquals(iterations * .1, vol1Count, iterations / 10);
+    assertEquals(iterations * .9, vol2Count, iterations / 10);
+
+  }
+
+  @Test
+  public void testWithNoCaching() throws IOException {
+
+    testSpecificSetup(10L, 90L, "0", iterations, true);
+
+    makeChoices();
+
+    assertEquals(iterations * .1, vol1Count, iterations / 10);
+    assertEquals(iterations * .9, vol2Count, iterations / 10);
+
+  }
+
+  private void makeChoices() {
+    SpaceAwareVolumeChooser chooser = new SpaceAwareVolumeChooser();
+    for (int i = 0; i < iterations; i++) {
+      String choice = chooser.choose(chooserEnv, tableDirs);
+      if (choice.equals(volumeOne)) {
+        vol1Count += 1;
+      }
+
+      if (choice.equals(volumeTwo)) {
+        vol2Count += 1;
+      }
+    }
+
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to