https://issues.apache.org/jira/browse/AMQ-5963
Disk limits can now be specified as a percentage of the partition size that the store is located on. The usual checks of the max size exceeding the available space will still apply. When using a percentage, the store can always regrow itself if space becomes available. https://issues.apache.org/jira/browse/AMQ-5964 https://issues.apache.org/jira/browse/AMQ-5965 https://issues.apache.org/jira/browse/AMQ-5969 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4cddd2c0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4cddd2c0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4cddd2c0 Branch: refs/heads/master Commit: 4cddd2c015435732b3ff5503160fa87b37154e2b Parents: 32ca1f5 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Fri Sep 11 13:50:35 2015 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Tue Sep 15 14:50:18 2015 +0000 ---------------------------------------------------------------------- .../apache/activemq/broker/BrokerService.java | 122 +- .../activemq/broker/BrokerService.java.orig | 3147 ++++++++++++++++++ .../activemq/usage/PercentLimitUsage.java | 53 + .../org/apache/activemq/usage/StoreUsage.java | 31 +- .../org/apache/activemq/usage/TempUsage.java | 34 +- .../org/apache/activemq/util/StoreUtil.java | 42 + .../usage/PercentDiskUsageLimitTest.java | 144 + .../usage/PeriodicDiskUsageLimitTest.java | 154 +- 8 files changed, 3665 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4cddd2c0/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 8ad56a7..4a27fd0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -111,7 +111,9 @@ import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.TransportFactorySupport; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.vm.VMTransportFactory; +import org.apache.activemq.usage.StoreUsage; import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.usage.Usage; import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.DefaultIOExceptionHandler; import org.apache.activemq.util.IOExceptionHandler; @@ -119,6 +121,7 @@ import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.StoreUtil; import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.util.TimeUtils; import org.apache.activemq.util.URISupport; @@ -230,6 +233,7 @@ public class BrokerService implements Service { private int schedulePeriodForDestinationPurge= 0; private int maxPurgedDestinationsPerSweep = 0; private int schedulePeriodForDiskUsageCheck = 0; + private int diskUsageCheckRegrowThreshold = -1; private BrokerContext brokerContext; private boolean networkConnectorStartAsync = false; private boolean allowTempAutoCreationOnSend; @@ -1965,30 +1969,7 @@ public class BrokerService implements Service { if (getPersistenceAdapter() != null) { PersistenceAdapter adapter = getPersistenceAdapter(); - File dir = adapter.getDirectory(); - - if (dir != null) { - String dirPath = dir.getAbsolutePath(); - if (!dir.isAbsolute()) { - dir = new File(dirPath); - } - - while (dir != null && !dir.isDirectory()) { - dir = dir.getParentFile(); - } - long storeLimit = usage.getStoreUsage().getLimit(); - long storeCurrent = usage.getStoreUsage().getUsage(); - long dirFreeSpace = dir.getUsableSpace(); - if (storeLimit > (dirFreeSpace + storeCurrent)) { - LOG.warn("Store limit is " + storeLimit / (1024 * 1024) + - " mb (current store usage is " + storeCurrent / (1024 * 1024) + - " mb). The data directory: " + dir.getAbsolutePath() + - " only has " + dirFreeSpace / (1024 * 1024) + - " mb of usable space - resetting to maximum available disk space: " + - (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); - usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent); - } - } + checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit()); long maxJournalFileSize = 0; long storeLimit = usage.getStoreUsage().getLimit(); @@ -2015,28 +1996,9 @@ public class BrokerService implements Service { final SystemUsage usage = getSystemUsage(); File tmpDir = getTmpDataDirectory(); - if (tmpDir != null) { - String tmpDirPath = tmpDir.getAbsolutePath(); - if (!tmpDir.isAbsolute()) { - tmpDir = new File(tmpDirPath); - } - - long storeLimit = usage.getTempUsage().getLimit(); - long storeCurrent = usage.getTempUsage().getUsage(); - while (tmpDir != null && !tmpDir.isDirectory()) { - tmpDir = tmpDir.getParentFile(); - } - long dirFreeSpace = tmpDir.getUsableSpace(); - if (storeLimit > (dirFreeSpace + storeCurrent)) { - LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) + - " mb (current temporary store usage is " + storeCurrent / (1024 * 1024) + - " mb). The temporary data directory: " + tmpDir.getAbsolutePath() + - " only has " + dirFreeSpace / (1024 * 1024) + - " mb of usable space - resetting to maximum available disk space: " + - (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); - usage.getTempUsage().setLimit(dirFreeSpace + storeCurrent); - } + if (tmpDir != null) { + checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit()); if (isPersistent()) { long maxJournalFileSize; @@ -2047,6 +2009,7 @@ public class BrokerService implements Service { } else { maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; } + long storeLimit = usage.getTempUsage().getLimit(); if (storeLimit < maxJournalFileSize) { LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + @@ -2058,6 +2021,60 @@ public class BrokerService implements Service { } } + protected void checkUsageLimit(File dir, Usage<?> storeUsage, int percentLimit) { + if (dir != null) { + dir = StoreUtil.findParentDirectory(dir); + String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store"; + long storeLimit = storeUsage.getLimit(); + long storeCurrent = storeUsage.getUsage(); + long usableSpace = dir.getUsableSpace(); + long totalSpace = dir.getTotalSpace(); + long totalUsableSpace = dir.getUsableSpace() + storeCurrent; + //compute byte value of the percent limit + long bytePercentLimit = totalSpace * percentLimit / 100; + int oneMeg = 1024 * 1024; + + //Check if the store limit is less than the percent Limit that was set and also + //the usable space...this means we can grow the store larger + //Changes in partition size (total space) as well as changes in usable space should + //be detected here + if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0 + && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){ + + // set the limit to be bytePercentLimit or usableSpace if + // usableSpace is less than the percentLimit + long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit; + + //To prevent changing too often, check threshold + if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) { + LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to " + + percentLimit + "% of the partition size."); + storeUsage.setLimit(newLimit); + LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace + + "% (" + newLimit / oneMeg + " mb) of the partition size."); + } + + //check if the limit is too large for the amount of usable space + } else if (storeLimit > totalUsableSpace) { + if (percentLimit > 0) { + LOG.warn(storeName + " limit has been set to " + + percentLimit + "% (" + storeLimit / oneMeg + " mb)" + + " of the partition size but there is not enough usable space." + + " Only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)" + + " is available"); + } + + LOG.warn(storeName + " limit is " + storeLimit / oneMeg + + " mb (current store usage is " + storeCurrent / oneMeg + + " mb). The data directory: " + dir.getAbsolutePath() + + " only has " + usableSpace / oneMeg + + " mb of usable space - resetting to maximum available disk space: " + + totalUsableSpace / oneMeg + " mb"); + storeUsage.setLimit(totalUsableSpace); + } + } + } + /** * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to * update store and temporary store limits if the amount of available space @@ -2892,11 +2909,26 @@ public class BrokerService implements Service { this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; } + /** + * @param schedulePeriodForDiskUsageCheck + */ public void setSchedulePeriodForDiskUsageCheck( int schedulePeriodForDiskUsageCheck) { this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck; } + public int getDiskUsageCheckRegrowThreshold() { + return diskUsageCheckRegrowThreshold; + } + + /** + * @param diskUsageCheckRegrowThreshold + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" + */ + public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) { + this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold; + } + public int getMaxPurgedDestinationsPerSweep() { return this.maxPurgedDestinationsPerSweep; }