Author: ctubbsii Date: Wed Dec 12 23:46:48 2012 New Revision: 1421044 URL: http://svn.apache.org/viewvc?rev=1421044&view=rev Log: ACCUMULO-467 Change the behavior of AccumuloFileOutputFormat to carry Accumulo properties in an AccumuloConfiguration object, to remove the side-effect behavior of RFileOperations permitting Hadoop configuration to override AccumuloConfiguration in all cases. ACCUMULO-769 The new methods that were added were done so in a way that is consistent with Hadoop's context-oriented MapReduce framework.
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1421044&r1=1421043&r2=1421044&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java Wed Dec 12 23:46:48 2012 @@ -17,88 +17,232 @@ package org.apache.accumulo.core.client.mapreduce; import java.io.IOException; +import java.util.Map.Entry; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** - * This class allows MapReduce jobs to use the Accumulo data file format for output of data + * This class allows MapReduce jobs to write output in the Accumulo data file format.<br /> + * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important requirement of Accumulo data files. * - * The user must specify the output path that does not exist following via static method calls to this class: - * - * AccumuloFileOutputFormat.setOutputPath(job, outputDirectory) - * - * Other methods from FileOutputFormat to configure options are ignored Compression is using the DefaultCodec and is always on + * <p> + * The output path to be created must be specified via {@link AccumuloFileOutputFormat#setOutputPath(Job, Path)}. This is inherited from + * {@link FileOutputFormat#setOutputPath(Job, Path)}. Other methods from {@link FileOutputFormat} are not supported and may be ignored or cause failures. Using + * other Hadoop configuration options that affect the behavior of the underlying files directly in the Job's configuration may work, but are not directly + * supported at this time. */ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { - private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName(); - public static final String FILE_TYPE = PREFIX + ".file_type"; + private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName() + "."; + private static final String ACCUMULO_PROPERTY_PREFIX = PREFIX + "accumuloProperties."; + + /** + * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been + * stored in the Job's configuration + * + * @since 1.5.0 + */ + protected static AccumuloConfiguration getAccumuloConfiguration(JobContext context) { + ConfigurationCopy acuConf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration()); + for (Entry<String,String> entry : context.getConfiguration()) + if (entry.getKey().startsWith(ACCUMULO_PROPERTY_PREFIX)) + acuConf.set(Property.getPropertyByKey(entry.getKey().substring(ACCUMULO_PROPERTY_PREFIX.length())), entry.getValue()); + return acuConf; + } - private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; - private static final String INSTANCE_NAME = PREFIX + ".instanceName"; - private static final String ZOOKEEPERS = PREFIX + ".zooKeepers"; + /** + * The supported Accumulo properties we set in this OutputFormat, that change the behavior of the RecordWriter.<br /> + * These properties correspond to the supported public static setter methods available to this class. + * + * @since 1.5.0 + */ + protected static boolean isSupportedAccumuloProperty(Property property) { + switch (property) { + case TABLE_FILE_COMPRESSION_TYPE: + case TABLE_FILE_COMPRESSED_BLOCK_SIZE: + case TABLE_FILE_BLOCK_SIZE: + case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX: + case TABLE_FILE_REPLICATION: + return true; + default: + return false; + } + } + + /** + * Helper for transforming Accumulo configuration properties into something that can be stored safely inside the Hadoop Job configuration. + * + * @since 1.5.0 + */ + protected static <T> void setAccumuloProperty(Job job, Property property, T value) { + if (isSupportedAccumuloProperty(property)) { + String val = String.valueOf(value); + if (property.getType().isValidFormat(val)) + job.getConfiguration().set(ACCUMULO_PROPERTY_PREFIX + property.getKey(), val); + else + throw new IllegalArgumentException("Value is not appropriate for property type '" + property.getType() + "'"); + } else + throw new IllegalArgumentException("Unsupported configuration property " + property.getKey()); + } + + /** + * @param compressionType + * The type of compression to use. One of "none", "gz", "lzo", or "snappy". Specifying a compression may require additional libraries to be available + * to your Job. + * @since 1.5.0 + */ + public static void setCompressionType(Job job, String compressionType) { + setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSION_TYPE, compressionType); + } + + /** + * Sets the size for data blocks within each file.<br /> + * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as a group. + * + * <p> + * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance). + * + * @since 1.5.0 + */ + public static void setDataBlockSize(Job job, long dataBlockSize) { + setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, dataBlockSize); + } + + /** + * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system + * + * @since 1.5.0 + */ + public static void setFileBlockSize(Job job, long fileBlockSize) { + setAccumuloProperty(job, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize); + } + + /** + * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow + * index hierarchy within the file. This can affect the performance of queries. + * + * @since 1.5.0 + */ + public static void setIndexBlockSize(Job job, long indexBlockSize) { + setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, indexBlockSize); + } + + /** + * Sets the file system replication factor for the resulting file, overriding the file system default. + * + * @since 1.5.0 + */ + public static void setReplication(Job job, int replication) { + setAccumuloProperty(job, Property.TABLE_FILE_REPLICATION, replication); + } @Override - public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { + public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException { // get the path of the temporary output file - final Configuration conf = job.getConfiguration(); + final Configuration conf = context.getConfiguration(); + final AccumuloConfiguration acuConf = getAccumuloConfiguration(context); - String extension = conf.get(FILE_TYPE); - if (extension == null || extension.isEmpty()) - extension = RFile.EXTENSION; - - final Path file = this.getDefaultWorkFile(job, "." + extension); + final String extension = acuConf.get(Property.TABLE_FILE_TYPE); + final Path file = this.getDefaultWorkFile(context, "." + extension); return new RecordWriter<Key,Value>() { FileSKVWriter out = null; @Override + public void close(TaskAttemptContext context) throws IOException { + if (out != null) + out.close(); + } + + @Override public void write(Key key, Value value) throws IOException { if (out == null) { - out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, AccumuloConfiguration.getDefaultConfiguration()); + out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf); out.startDefaultLocalityGroup(); } out.append(key, value); } - - @Override - public void close(TaskAttemptContext context) throws IOException, InterruptedException { - if (out != null) - out.close(); - } }; } - public static void setFileType(Configuration conf, String type) { - conf.set(FILE_TYPE, type); + // ---------------------------------------------------------------------------------------------------- + // Everything below this line is deprecated and should go away in future versions + // ---------------------------------------------------------------------------------------------------- + + /** + * @deprecated since 1.5.0; + */ + @SuppressWarnings("unused") + @Deprecated + private static final String FILE_TYPE = PREFIX + "file_type"; + + /** + * @deprecated since 1.5.0; + */ + @SuppressWarnings("unused") + @Deprecated + private static final String BLOCK_SIZE = PREFIX + "block_size"; + + /** + * @deprecated since 1.5.0; + */ + @Deprecated + private static final String INSTANCE_HAS_BEEN_SET = PREFIX + "instanceConfigured"; + + /** + * @deprecated since 1.5.0; + */ + @Deprecated + private static final String INSTANCE_NAME = PREFIX + "instanceName"; + + /** + * @deprecated since 1.5.0; + */ + @Deprecated + private static final String ZOOKEEPERS = PREFIX + "zooKeepers"; + + /** + * @deprecated since 1.5.0; Retrieve the relevant block size from {@link #getAccumuloConfiguration(JobContext)} + */ + @Deprecated + protected static void handleBlockSize(Configuration conf) { + conf.setInt("io.seqfile.compress.blocksize", + (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)); } /** - * @deprecated since 1.5, use {@link #setCompressedBlockSize(Configuration, long)} instead + * @deprecated since 1.5.0; This method does nothing. Only 'rf' type is supported. + */ + @Deprecated + public static void setFileType(Configuration conf, String type) {} + + /** + * @deprecated since 1.5.0; Use {@link #setFileBlockSize(Job, long)}, {@link #setDataBlockSize(Job, long)}, or {@link #setIndexBlockSize(Job, long)} instead. */ + @Deprecated public static void setBlockSize(Configuration conf, int blockSize) { - long bs = blockSize; - setCompressedBlockSize(conf, bs); + conf.set(ACCUMULO_PROPERTY_PREFIX + Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), String.valueOf(blockSize)); } /** - * @param conf - * @param instanceName - * @param zooKeepers + * @deprecated since 1.5.0; This OutputFormat does not communicate with Accumulo. If this is needed, subclasses must implement their own configuration. */ + @Deprecated public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) throw new IllegalStateException("Instance info can only be set once per job"); @@ -110,30 +254,11 @@ public class AccumuloFileOutputFormat ex } /** - * @param conf - * @return The Accumulo instance. + * @deprecated since 1.5.0; This OutputFormat does not communicate with Accumulo. If this is needed, subclasses must implement their own configuration. */ + @Deprecated protected static Instance getInstance(Configuration conf) { return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)); } - public static void setReplication(Configuration conf, int replication) { - conf.setInt(Property.TABLE_FILE_REPLICATION.getKey(), replication); - } - - public static void setDFSBlockSize(Configuration conf, long blockSize) { - conf.setLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), blockSize); - } - - public static void setCompressedBlockSize(Configuration conf, long cblockSize) { - conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), cblockSize); - } - - public static void setCompressedBlockSizeIndex(Configuration conf, long cblockSizeIndex) { - conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), cblockSizeIndex); - } - - public static void setCompressionType(Configuration conf, String compression) { - conf.set(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), compression); - } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java?rev=1421044&r1=1421043&r2=1421044&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java Wed Dec 12 23:46:48 2012 @@ -104,23 +104,22 @@ public class RFileOperations extends Fil @Override public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { int hrep = conf.getInt("dfs.replication", -1); - int trep = conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), acuconf.getCount(Property.TABLE_FILE_REPLICATION)); + int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION); int rep = hrep; if (trep > 0 && trep != hrep) { rep = trep; } long hblock = conf.getLong("dfs.block.size", 1 << 26); - long tblock = conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE)); + long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE); long block = hblock; if (tblock > 0) block = tblock; int bufferSize = conf.getInt("io.file.buffer.size", 4096); - long blockSize = conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)); - long indexBlockSize = conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), - acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)); + long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE); + long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX); - String compression = conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); + String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE); CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf); Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize); Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java?rev=1421044&r1=1421043&r2=1421044&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java Wed Dec 12 23:46:48 2012 @@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTarge import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MapContext; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.TaskA */ public class ContextFactory { + private static final Constructor<?> JOB_CONSTRUCTOR; private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR; private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR; private static final Constructor<?> TASK_ID_CONSTRUCTOR; @@ -58,6 +60,7 @@ public class ContextFactory { v21 = false; } useV21 = v21; + Class<?> jobCls; Class<?> jobContextCls; Class<?> taskContextCls; Class<?> mapCls; @@ -65,6 +68,7 @@ public class ContextFactory { Class<?> innerMapContextCls; try { if (v21) { + jobCls = Class.forName(PACKAGE + ".Job"); jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl"); taskContextCls = Class.forName(PACKAGE + ".task.TaskAttemptContextImpl"); TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType"); @@ -72,6 +76,7 @@ public class ContextFactory { mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper"); innerMapContextCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper$Context"); } else { + jobCls = Class.forName(PACKAGE + ".Job"); jobContextCls = Class.forName(PACKAGE + ".JobContext"); taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext"); TASK_TYPE_CLASS = null; @@ -83,6 +88,7 @@ public class ContextFactory { throw new IllegalArgumentException("Can't find class", e); } try { + JOB_CONSTRUCTOR = jobCls.getConstructor(Configuration.class, String.class); JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class); JOB_CONTEXT_CONSTRUCTOR.setAccessible(true); TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class); @@ -111,6 +117,22 @@ public class ContextFactory { } } + public static Job createJob() { + return createJob(new Configuration()); + } + + public static Job createJob(Configuration conf) { + try { + return (Job) JOB_CONSTRUCTOR.newInstance(conf, new JobID("local", 0).toString()); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't create object", e); + } + } + public static JobContext createJobContext() { return createJobContext(new Configuration()); } @@ -152,16 +174,19 @@ public class ContextFactory { return createMapContext(m, tac, reader, writer, null, null, split); } - @SuppressWarnings({"unchecked", "rawtypes"}) public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader, RecordWriter<K2,V2> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { try { if (useV21) { Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, split); - return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) MAP_CONSTRUCTOR.newInstance(), basis); + @SuppressWarnings("unchecked") + Mapper<K1,V1,K2,V2>.Context newInstance = (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(MAP_CONSTRUCTOR.newInstance(), basis); + return newInstance; } else { - return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, - split); + @SuppressWarnings("unchecked") + Mapper<K1,V1,K2,V2>.Context newInstance = (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), + tac.getTaskAttemptID(), reader, writer, committer, reporter, split); + return newInstance; } } catch (InstantiationException e) { throw new IllegalArgumentException("Can't create object", e); Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1421044&r1=1421043&r2=1421044&view=diff ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java (original) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java Wed Dec 12 23:46:48 2012 @@ -27,9 +27,8 @@ import org.apache.accumulo.core.conf.Pro import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.ContextFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.junit.After; @@ -37,13 +36,13 @@ import org.junit.Before; import org.junit.Test; public class AccumuloFileOutputFormatTest { - static JobContext job; + static Job job; static TaskAttemptContext tac; static Path f = null; @Before public void setup() { - job = ContextFactory.createJobContext(); + job = ContextFactory.createJob(); Path file = new Path("target/"); f = new Path(file, "_temporary"); @@ -89,8 +88,6 @@ public class AccumuloFileOutputFormatTes @Test public void validateConfiguration() throws IOException, InterruptedException { - Configuration conf = job.getConfiguration(); - AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration(); int a = 7; long b = 300l; @@ -98,17 +95,18 @@ public class AccumuloFileOutputFormatTes long d = 10l; String e = "type"; - AccumuloFileOutputFormat.setReplication(conf, a); - AccumuloFileOutputFormat.setDFSBlockSize(conf, b); - AccumuloFileOutputFormat.setCompressedBlockSize(conf, c); - AccumuloFileOutputFormat.setCompressedBlockSizeIndex(conf, d); - AccumuloFileOutputFormat.setCompressionType(conf, e); - - assertEquals(a, conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), acuconf.getCount(Property.TABLE_FILE_REPLICATION))); - assertEquals(b, conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE))); - assertEquals(c, conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE))); - assertEquals(d, - conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX))); - assertEquals(e, conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE))); + AccumuloFileOutputFormat.setReplication(job, a); + AccumuloFileOutputFormat.setFileBlockSize(job, b); + AccumuloFileOutputFormat.setDataBlockSize(job, c); + AccumuloFileOutputFormat.setIndexBlockSize(job, d); + AccumuloFileOutputFormat.setCompressionType(job, e); + + AccumuloConfiguration acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job); + + assertEquals(a, acuconf.getCount(Property.TABLE_FILE_REPLICATION)); + assertEquals(b, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE)); + assertEquals(c, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)); + assertEquals(d, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)); + assertEquals(e, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); } }