keith-turner closed pull request #300: ACCUMULO-4708 Limit RFile block size to
2GB
URL: https://github.com/apache/accumulo/pull/300
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
b/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
index 130863c961..9c2ed6c377 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
@@ -21,6 +21,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* A utility class for validating {@link AccumuloConfiguration} instances.
*/
@@ -66,6 +68,13 @@ else if (!prop.getType().isValidFormat(value))
if (key.equals(Property.INSTANCE_VOLUMES.getKey())) {
usingVolumes = value != null && !value.isEmpty();
}
+
+ // If the block size or block size index is configured to be too large,
we throw an exception to avoid potentially corrupting RFiles later
+ if (key.equals(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey())
|| key.equals(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey())) {
+ long bsize = ConfigurationTypeHelper.getFixedMemoryAsBytes(value);
+ Preconditions.checkArgument(bsize > 0 && bsize < Integer.MAX_VALUE,
key + " must be greater than 0 and less than " + Integer.MAX_VALUE + " but was:
"
+ + bsize);
+ }
}
if (instanceZkTimeoutValue != null) {
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index c1931daed3..c399a22da8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -404,7 +404,7 @@ public void flushIfNeeded() throws IOException {
private SampleLocalityGroupWriter sample;
private SummaryStatistics keyLenStats = new SummaryStatistics();
- private double avergageKeySize = 0;
+ private double averageKeySize = 0;
LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long
maxBlockSize, LocalityGroupMetadata currentLocalityGroup,
SampleLocalityGroupWriter sample) {
@@ -441,19 +441,27 @@ public void append(Key key, Value value) throws
IOException {
} else if (blockWriter.getRawSize() > blockSize) {
// Look for a key thats short to put in the index, defining short as
average or below.
- if (avergageKeySize == 0) {
+ if (averageKeySize == 0) {
// use the same average for the search for a below average key for a
block
- avergageKeySize = keyLenStats.getMean();
+ averageKeySize = keyLenStats.getMean();
}
// Possibly produce a shorter key that does not exist in data. Even if
a key can be shortened, it may not be below average.
Key closeKey = KeyShortener.shorten(prevKey, key);
- if ((closeKey.getSize() <= avergageKeySize || blockWriter.getRawSize()
> maxBlockSize) && !isGiantKey(closeKey)) {
+ if ((closeKey.getSize() <= averageKeySize || blockWriter.getRawSize()
> maxBlockSize) && !isGiantKey(closeKey)) {
closeBlock(closeKey, false);
blockWriter = fileWriter.prepareDataBlock();
// set average to zero so its recomputed for the next block
- avergageKeySize = 0;
+ averageKeySize = 0;
+ // To constrain the growth of data blocks, we limit our worst case
scenarios to closing
+ // blocks if they reach the maximum configurable block size of
Integer.MAX_VALUE.
+ // 128 bytes added for metadata overhead
+ } else if (((long) key.getSize() + (long) value.getSize() +
blockWriter.getRawSize() + 128L) >= Integer.MAX_VALUE) {
+ closeBlock(closeKey, false);
+ blockWriter = fileWriter.prepareDataBlock();
+ averageKeySize = 0;
+
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 4d1af7ef7d..195da93122 100644
---
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -38,6 +38,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import com.google.common.base.Preconditions;
+
public class RFileOperations extends FileOperations {
private static final Collection<ByteSequence> EMPTY_CF_SET =
Collections.emptySet();
@@ -82,7 +84,11 @@ protected FileSKVWriter openWriter(OpenWriterOperation
options) throws IOExcepti
AccumuloConfiguration acuconf = options.getTableConfiguration();
long blockSize =
acuconf.getAsBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
+ Preconditions.checkArgument((blockSize < Integer.MAX_VALUE && blockSize >
0), "table.file.compress.blocksize must be greater than 0 and less than "
+ + Integer.MAX_VALUE);
long indexBlockSize =
acuconf.getAsBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
+ Preconditions.checkArgument((indexBlockSize < Integer.MAX_VALUE &&
indexBlockSize > 0),
+ "table.file.compress.blocksize.index must be greater than 0 and less
than " + Integer.MAX_VALUE);
SamplerConfigurationImpl samplerConfig =
SamplerConfigurationImpl.newSamplerConfig(acuconf);
Sampler sampler = null;
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 5cfe8242ea..a1696195d7 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -283,12 +283,13 @@ public void finish() throws IOException {
/**
* Get the raw size of the block.
*
+ * Caution: size() comes from DataOutputStream which returns
Integer.MAX_VALUE on an overflow. This results in a value of 2GiB meaning that
+ * an unknown amount of data, at least 2GiB large, has been written.
RFiles handle this issue by keeping track of the position of blocks
+ * instead of relying on blocks to provide this information.
+ *
* @return the number of uncompressed bytes written through the
BlockAppender so far.
*/
public long getRawSize() throws IOException {
- /**
- * Expecting the size() of a block not exceeding 4GB. Assuming the
size() will wrap to negative integer if it exceeds 2GB.
- */
return size() & 0x00000000ffffffffL;
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
index 44531dc416..c1962f78f8 100644
---
a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
+++
b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
@@ -28,6 +28,7 @@
* Classes that obey this interface may be used to provide encrypting and
decrypting streams to the rest of Accumulo. Classes that obey this interface
may be
* configured as the crypto module by setting the property crypto.module.class
in the accumulo-site.xml file.
*
+ * When implementing CryptoModule, it is recommended that any {@link
CipherOutputStreams} uses {@link RFileCipherOutputStream} instead.
*
*/
public interface CryptoModule {
diff --git
a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
index 7609bb0f63..c5c41cda68 100644
---
a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
+++
b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
@@ -30,7 +30,6 @@
import javax.crypto.Cipher;
import javax.crypto.CipherInputStream;
-import javax.crypto.CipherOutputStream;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
@@ -251,7 +250,7 @@ public CryptoModuleParameters
getEncryptingOutputStream(CryptoModuleParameters p
throw new RuntimeException("Encryption cipher must be a block cipher");
}
- CipherOutputStream cipherOutputStream = new
CipherOutputStream(params.getPlaintextOutputStream(), cipher);
+ RFileCipherOutputStream cipherOutputStream = new
RFileCipherOutputStream(params.getPlaintextOutputStream(), cipher);
BlockedOutputStream blockedOutputStream = new
BlockedOutputStream(cipherOutputStream, cipher.getBlockSize(),
params.getBlockStreamSize());
params.setEncryptedOutputStream(blockedOutputStream);
diff --git
a/core/src/main/java/org/apache/accumulo/core/security/crypto/RFileCipherOutputStream.java
b/core/src/main/java/org/apache/accumulo/core/security/crypto/RFileCipherOutputStream.java
new file mode 100644
index 0000000000..7dad8023e6
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/security/crypto/RFileCipherOutputStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * see the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherOutputStream;
+
+/**
+ *
+ * This class extends {@link CipherOutputStream} to include a way to track the
number of bytes that have
+ * been encrypted by the stream. The write method also includes a mechanism to
stop writing and
+ * throw an exception if exceeding a maximum number of bytes is attempted.
+ *
+ */
+public class RFileCipherOutputStream extends CipherOutputStream {
+
+ // This is the maximum size encrypted stream that can be written. Attempting
to write anything larger
+ // will cause an exception. Given that each block in an rfile is encrypted
separately, and blocks
+ // should be written such that a block cannot ever reach 16GiB, this is
believed to be a safe number.
+ // If this does cause an exception, it is an issue best addressed elsewhere.
+ private final long maxOutputSize = 1L << 34; //16GiB
+
+ // The total number of bytes that have been written out
+ private long count = 0;
+
+ /**
+ *
+ * Constructs a RFileCipherOutputStream
+ *
+ * @param os
+ * the OutputStream object
+ * @param c
+ * an initialized Cipher object
+ */
+ public RFileCipherOutputStream(OutputStream os, Cipher c) {
+ super(os, c);
+ }
+
+ /**
+ * Override of CipherOutputStream's write to count the number of bytes that
have been encrypted.
+ * This method now throws an exception if an attempt to write bytes beyond a
maximum is made.
+ */
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ count += len;
+ if (count > maxOutputSize) {
+ throw new IOException("Attempt to write " + count + " bytes was made. A
maximum of " + maxOutputSize + " is allowed for an encryption stream.");
+ }
+ super.write(b, off, len);
+ }
+
+ @Override
+ public void write(byte b[]) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ /**
+ * Override of CipherOutputStream's write for a single byte to count it.
This method now throws
+ * an exception if an attempt to write bytes beyond a maximum is made.
+ */
+ @Override
+ public void write(int b) throws IOException {
+ count++;
+ if (count > maxOutputSize) {
+ throw new IOException("Attempt to write " + count + " bytes was made. A
maximum of " + maxOutputSize + " is allowed for an encryption stream.");
+ }
+ super.write(b);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 0d8782d61b..378cdd6778 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,7 @@
<surefire.groups />
<!-- Thrift version -->
<thrift.version>0.10.0</thrift.version>
+ <unitTestMemSize>-Xmx1G</unitTestMemSize>
<!-- ZooKeeper version -->
<zookeeper.version>3.4.10</zookeeper.version>
</properties>
@@ -869,7 +870,7 @@
<systemPropertyVariables>
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
</systemPropertyVariables>
- <argLine>-Xmx1G</argLine>
+ <argLine>${unitTestMemSize}</argLine>
</configuration>
</plugin>
<plugin>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services