Author: ctubbsii
Date: Wed Dec 12 23:46:48 2012
New Revision: 1421044

URL: http://svn.apache.org/viewvc?rev=1421044&view=rev
Log:
ACCUMULO-467 Change the behavior of AccumuloFileOutputFormat to carry Accumulo 
properties in an AccumuloConfiguration object, to remove the side-effect 
behavior of RFileOperations permitting Hadoop configuration to override 
AccumuloConfiguration in all cases.
ACCUMULO-769 The new methods that were added were done so in a way that is 
consistent with Hadoop's context-oriented MapReduce framework.

Modified:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
 Wed Dec 12 23:46:48 2012
@@ -17,88 +17,232 @@
 package org.apache.accumulo.core.client.mapreduce;
 
 import java.io.IOException;
+import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
- * This class allows MapReduce jobs to use the Accumulo data file format for 
output of data
+ * This class allows MapReduce jobs to write output in the Accumulo data file 
format.<br />
+ * Care should be taken to write only sorted data (sorted by {@link Key}), as 
this is an important requirement of Accumulo data files.
  * 
- * The user must specify the output path that does not exist following via 
static method calls to this class:
- * 
- * AccumuloFileOutputFormat.setOutputPath(job, outputDirectory)
- * 
- * Other methods from FileOutputFormat to configure options are ignored 
Compression is using the DefaultCodec and is always on
+ * <p>
+ * The output path to be created must be specified via {@link 
AccumuloFileOutputFormat#setOutputPath(Job, Path)}. This is inherited from
+ * {@link FileOutputFormat#setOutputPath(Job, Path)}. Other methods from 
{@link FileOutputFormat} are not supported and may be ignored or cause 
failures. Using
+ * other Hadoop configuration options that affect the behavior of the 
underlying files directly in the Job's configuration may work, but are not 
directly
+ * supported at this time.
  */
 public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
-  private static final String PREFIX = 
AccumuloOutputFormat.class.getSimpleName();
-  public static final String FILE_TYPE = PREFIX + ".file_type";
+  private static final String PREFIX = 
AccumuloOutputFormat.class.getSimpleName() + ".";
+  private static final String ACCUMULO_PROPERTY_PREFIX = PREFIX + 
"accumuloProperties.";
+  
+  /**
+   * This helper method provides an AccumuloConfiguration object constructed 
from the Accumulo defaults, and overridden with Accumulo properties that have 
been
+   * stored in the Job's configuration
+   * 
+   * @since 1.5.0
+   */
+  protected static AccumuloConfiguration getAccumuloConfiguration(JobContext 
context) {
+    ConfigurationCopy acuConf = new 
ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration());
+    for (Entry<String,String> entry : context.getConfiguration())
+      if (entry.getKey().startsWith(ACCUMULO_PROPERTY_PREFIX))
+        
acuConf.set(Property.getPropertyByKey(entry.getKey().substring(ACCUMULO_PROPERTY_PREFIX.length())),
 entry.getValue());
+    return acuConf;
+  }
   
-  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + 
".instanceConfigured";
-  private static final String INSTANCE_NAME = PREFIX + ".instanceName";
-  private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
+  /**
+   * The supported Accumulo properties we set in this OutputFormat, that 
change the behavior of the RecordWriter.<br />
+   * These properties correspond to the supported public static setter methods 
available to this class.
+   * 
+   * @since 1.5.0
+   */
+  protected static boolean isSupportedAccumuloProperty(Property property) {
+    switch (property) {
+      case TABLE_FILE_COMPRESSION_TYPE:
+      case TABLE_FILE_COMPRESSED_BLOCK_SIZE:
+      case TABLE_FILE_BLOCK_SIZE:
+      case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX:
+      case TABLE_FILE_REPLICATION:
+        return true;
+      default:
+        return false;
+    }
+  }
+  
+  /**
+   * Helper for transforming Accumulo configuration properties into something 
that can be stored safely inside the Hadoop Job configuration.
+   * 
+   * @since 1.5.0
+   */
+  protected static <T> void setAccumuloProperty(Job job, Property property, T 
value) {
+    if (isSupportedAccumuloProperty(property)) {
+      String val = String.valueOf(value);
+      if (property.getType().isValidFormat(val))
+        job.getConfiguration().set(ACCUMULO_PROPERTY_PREFIX + 
property.getKey(), val);
+      else
+        throw new IllegalArgumentException("Value is not appropriate for 
property type '" + property.getType() + "'");
+    } else
+      throw new IllegalArgumentException("Unsupported configuration property " 
+ property.getKey());
+  }
+  
+  /**
+   * @param compressionType
+   *          The type of compression to use. One of "none", "gz", "lzo", or 
"snappy". Specifying a compression may require additional libraries to be 
available
+   *          to your Job.
+   * @since 1.5.0
+   */
+  public static void setCompressionType(Job job, String compressionType) {
+    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSION_TYPE, 
compressionType);
+  }
+  
+  /**
+   * Sets the size for data blocks within each file.<br />
+   * Data blocks are a span of key/value pairs stored in the file that are 
compressed and indexed as a group.
+   * 
+   * <p>
+   * Making this value smaller may increase seek performance, but at the cost 
of increasing the size of the indexes (which can also affect seek performance).
+   * 
+   * @since 1.5.0
+   */
+  public static void setDataBlockSize(Job job, long dataBlockSize) {
+    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, 
dataBlockSize);
+  }
+  
+  /**
+   * Sets the size for file blocks in the file system; file blocks are 
managed, and replicated, by the underlying file system
+   * 
+   * @since 1.5.0
+   */
+  public static void setFileBlockSize(Job job, long fileBlockSize) {
+    setAccumuloProperty(job, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize);
+  }
+  
+  /**
+   * Sets the size for index blocks within each file; smaller blocks means a 
deeper index hierarchy within the file, while larger blocks mean a more shallow
+   * index hierarchy within the file. This can affect the performance of 
queries.
+   * 
+   * @since 1.5.0
+   */
+  public static void setIndexBlockSize(Job job, long indexBlockSize) {
+    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, 
indexBlockSize);
+  }
+  
+  /**
+   * Sets the file system replication factor for the resulting file, 
overriding the file system default.
+   * 
+   * @since 1.5.0
+   */
+  public static void setReplication(Job job, int replication) {
+    setAccumuloProperty(job, Property.TABLE_FILE_REPLICATION, replication);
+  }
   
   @Override
-  public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext job) 
throws IOException, InterruptedException {
+  public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) 
throws IOException {
     // get the path of the temporary output file
-    final Configuration conf = job.getConfiguration();
+    final Configuration conf = context.getConfiguration();
+    final AccumuloConfiguration acuConf = getAccumuloConfiguration(context);
     
-    String extension = conf.get(FILE_TYPE);
-    if (extension == null || extension.isEmpty())
-      extension = RFile.EXTENSION;
-    
-    final Path file = this.getDefaultWorkFile(job, "." + extension);
+    final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
+    final Path file = this.getDefaultWorkFile(context, "." + extension);
     
     return new RecordWriter<Key,Value>() {
       FileSKVWriter out = null;
       
       @Override
+      public void close(TaskAttemptContext context) throws IOException {
+        if (out != null)
+          out.close();
+      }
+      
+      @Override
       public void write(Key key, Value value) throws IOException {
         if (out == null) {
-          out = FileOperations.getInstance().openWriter(file.toString(), 
file.getFileSystem(conf), conf, 
AccumuloConfiguration.getDefaultConfiguration());
+          out = FileOperations.getInstance().openWriter(file.toString(), 
file.getFileSystem(conf), conf, acuConf);
           out.startDefaultLocalityGroup();
         }
         out.append(key, value);
       }
-      
-      @Override
-      public void close(TaskAttemptContext context) throws IOException, 
InterruptedException {
-        if (out != null)
-          out.close();
-      }
     };
   }
   
-  public static void setFileType(Configuration conf, String type) {
-    conf.set(FILE_TYPE, type);
+  // 
----------------------------------------------------------------------------------------------------
+  // Everything below this line is deprecated and should go away in future 
versions
+  // 
----------------------------------------------------------------------------------------------------
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @SuppressWarnings("unused")
+  @Deprecated
+  private static final String FILE_TYPE = PREFIX + "file_type";
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @SuppressWarnings("unused")
+  @Deprecated
+  private static final String BLOCK_SIZE = PREFIX + "block_size";
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @Deprecated
+  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + 
"instanceConfigured";
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @Deprecated
+  private static final String INSTANCE_NAME = PREFIX + "instanceName";
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @Deprecated
+  private static final String ZOOKEEPERS = PREFIX + "zooKeepers";
+  
+  /**
+   * @deprecated since 1.5.0; Retrieve the relevant block size from {@link 
#getAccumuloConfiguration(JobContext)}
+   */
+  @Deprecated
+  protected static void handleBlockSize(Configuration conf) {
+    conf.setInt("io.seqfile.compress.blocksize",
+        (int) 
AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
   }
   
   /**
-   * @deprecated since 1.5, use {@link #setCompressedBlockSize(Configuration, 
long)} instead
+   * @deprecated since 1.5.0; This method does nothing. Only 'rf' type is 
supported.
+   */
+  @Deprecated
+  public static void setFileType(Configuration conf, String type) {}
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setFileBlockSize(Job, long)}, {@link 
#setDataBlockSize(Job, long)}, or {@link #setIndexBlockSize(Job, long)} instead.
    */
+  @Deprecated
   public static void setBlockSize(Configuration conf, int blockSize) {
-    long bs = blockSize;
-    setCompressedBlockSize(conf, bs);
+    conf.set(ACCUMULO_PROPERTY_PREFIX + 
Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), String.valueOf(blockSize));
   }
   
   /**
-   * @param conf
-   * @param instanceName
-   * @param zooKeepers
+   * @deprecated since 1.5.0; This OutputFormat does not communicate with 
Accumulo. If this is needed, subclasses must implement their own configuration.
    */
+  @Deprecated
   public static void setZooKeeperInstance(Configuration conf, String 
instanceName, String zooKeepers) {
     if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
       throw new IllegalStateException("Instance info can only be set once per 
job");
@@ -110,30 +254,11 @@ public class AccumuloFileOutputFormat ex
   }
   
   /**
-   * @param conf
-   * @return The Accumulo instance.
+   * @deprecated since 1.5.0; This OutputFormat does not communicate with 
Accumulo. If this is needed, subclasses must implement their own configuration.
    */
+  @Deprecated
   protected static Instance getInstance(Configuration conf) {
     return new ZooKeeperInstance(conf.get(INSTANCE_NAME), 
conf.get(ZOOKEEPERS));
   }
   
-  public static void setReplication(Configuration conf, int replication) {
-    conf.setInt(Property.TABLE_FILE_REPLICATION.getKey(), replication);
-  }
-  
-  public static void setDFSBlockSize(Configuration conf, long blockSize) {
-    conf.setLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), blockSize);
-  }
-  
-  public static void setCompressedBlockSize(Configuration conf, long 
cblockSize) {
-    conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), 
cblockSize);
-  }
-  
-  public static void setCompressedBlockSizeIndex(Configuration conf, long 
cblockSizeIndex) {
-    conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), 
cblockSizeIndex);
-  }
-  
-  public static void setCompressionType(Configuration conf, String 
compression) {
-    conf.set(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), compression);
-  }
 }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
 Wed Dec 12 23:46:48 2012
@@ -104,23 +104,22 @@ public class RFileOperations extends Fil
   @Override
   public FileSKVWriter openWriter(String file, FileSystem fs, Configuration 
conf, AccumuloConfiguration acuconf) throws IOException {
     int hrep = conf.getInt("dfs.replication", -1);
-    int trep = conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), 
acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+    int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
     int rep = hrep;
     if (trep > 0 && trep != hrep) {
       rep = trep;
     }
     long hblock = conf.getLong("dfs.block.size", 1 << 26);
-    long tblock = conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), 
acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+    long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
     long block = hblock;
     if (tblock > 0)
       block = tblock;
     int bufferSize = conf.getInt("io.file.buffer.size", 4096);
     
-    long blockSize = 
conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), 
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
-    long indexBlockSize = 
conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(),
-        
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+    long blockSize = 
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
+    long indexBlockSize = 
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
     
-    String compression = 
conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), 
acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+    String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
     
     CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new 
Path(file), false, bufferSize, (short) rep, block), compression, conf);
     Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) 
indexBlockSize);

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
 Wed Dec 12 23:46:48 2012
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTarge
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MapContext;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.TaskA
  */
 public class ContextFactory {
   
+  private static final Constructor<?> JOB_CONSTRUCTOR;
   private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
   private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
   private static final Constructor<?> TASK_ID_CONSTRUCTOR;
@@ -58,6 +60,7 @@ public class ContextFactory {
       v21 = false;
     }
     useV21 = v21;
+    Class<?> jobCls;
     Class<?> jobContextCls;
     Class<?> taskContextCls;
     Class<?> mapCls;
@@ -65,6 +68,7 @@ public class ContextFactory {
     Class<?> innerMapContextCls;
     try {
       if (v21) {
+        jobCls = Class.forName(PACKAGE + ".Job");
         jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl");
         taskContextCls = Class.forName(PACKAGE + 
".task.TaskAttemptContextImpl");
         TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType");
@@ -72,6 +76,7 @@ public class ContextFactory {
         mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
         innerMapContextCls = Class.forName(PACKAGE + 
".lib.map.WrappedMapper$Context");
       } else {
+        jobCls = Class.forName(PACKAGE + ".Job");
         jobContextCls = Class.forName(PACKAGE + ".JobContext");
         taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext");
         TASK_TYPE_CLASS = null;
@@ -83,6 +88,7 @@ public class ContextFactory {
       throw new IllegalArgumentException("Can't find class", e);
     }
     try {
+      JOB_CONSTRUCTOR = jobCls.getConstructor(Configuration.class, 
String.class);
       JOB_CONTEXT_CONSTRUCTOR = 
jobContextCls.getConstructor(Configuration.class, JobID.class);
       JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
       TASK_CONTEXT_CONSTRUCTOR = 
taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class);
@@ -111,6 +117,22 @@ public class ContextFactory {
     }
   }
   
+  public static Job createJob() {
+    return createJob(new Configuration());
+  }
+  
+  public static Job createJob(Configuration conf) {
+    try {
+      return (Job) JOB_CONSTRUCTOR.newInstance(conf, new JobID("local", 
0).toString());
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    }
+  }
+  
   public static JobContext createJobContext() {
     return createJobContext(new Configuration());
   }
@@ -152,16 +174,19 @@ public class ContextFactory {
     return createMapContext(m, tac, reader, writer, null, null, split);
   }
   
-  @SuppressWarnings({"unchecked", "rawtypes"})
   public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context 
createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, 
RecordReader<K1,V1> reader,
       RecordWriter<K2,V2> writer, OutputCommitter committer, StatusReporter 
reporter, InputSplit split) {
     try {
       if (useV21) {
         Object basis = 
MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), 
tac.getTaskAttemptID(), reader, writer, committer, reporter, split);
-        return (Mapper.Context) 
MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) 
MAP_CONSTRUCTOR.newInstance(), basis);
+        @SuppressWarnings("unchecked")
+        Mapper<K1,V1,K2,V2>.Context newInstance = 
(Mapper<K1,V1,K2,V2>.Context) 
MAP_CONTEXT_CONSTRUCTOR.newInstance(MAP_CONSTRUCTOR.newInstance(), basis);
+        return newInstance;
       } else {
-        return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, 
tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, 
reporter,
-            split);
+        @SuppressWarnings("unchecked")
+        Mapper<K1,V1,K2,V2>.Context newInstance = 
(Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, 
tac.getConfiguration(),
+            tac.getTaskAttemptID(), reader, writer, committer, reporter, 
split);
+        return newInstance;
       }
     } catch (InstantiationException e) {
       throw new IllegalArgumentException("Can't create object", e);

Modified: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
 (original)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
 Wed Dec 12 23:46:48 2012
@@ -27,9 +27,8 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.ContextFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.After;
@@ -37,13 +36,13 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class AccumuloFileOutputFormatTest {
-  static JobContext job;
+  static Job job;
   static TaskAttemptContext tac;
   static Path f = null;
   
   @Before
   public void setup() {
-    job = ContextFactory.createJobContext();
+    job = ContextFactory.createJob();
     
     Path file = new Path("target/");
     f = new Path(file, "_temporary");
@@ -89,8 +88,6 @@ public class AccumuloFileOutputFormatTes
   
   @Test
   public void validateConfiguration() throws IOException, InterruptedException 
{
-    Configuration conf = job.getConfiguration();
-    AccumuloConfiguration acuconf = 
AccumuloConfiguration.getDefaultConfiguration();
     
     int a = 7;
     long b = 300l;
@@ -98,17 +95,18 @@ public class AccumuloFileOutputFormatTes
     long d = 10l;
     String e = "type";
     
-    AccumuloFileOutputFormat.setReplication(conf, a);
-    AccumuloFileOutputFormat.setDFSBlockSize(conf, b);
-    AccumuloFileOutputFormat.setCompressedBlockSize(conf, c);
-    AccumuloFileOutputFormat.setCompressedBlockSizeIndex(conf, d);
-    AccumuloFileOutputFormat.setCompressionType(conf, e);
-    
-    assertEquals(a, conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), 
acuconf.getCount(Property.TABLE_FILE_REPLICATION)));
-    assertEquals(b, conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), 
acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE)));
-    assertEquals(c, 
conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), 
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)));
-    assertEquals(d,
-        conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), 
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)));
-    assertEquals(e, conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), 
acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)));
+    AccumuloFileOutputFormat.setReplication(job, a);
+    AccumuloFileOutputFormat.setFileBlockSize(job, b);
+    AccumuloFileOutputFormat.setDataBlockSize(job, c);
+    AccumuloFileOutputFormat.setIndexBlockSize(job, d);
+    AccumuloFileOutputFormat.setCompressionType(job, e);
+    
+    AccumuloConfiguration acuconf = 
AccumuloFileOutputFormat.getAccumuloConfiguration(job);
+    
+    assertEquals(a, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+    assertEquals(b, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+    assertEquals(c, 
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+    assertEquals(d, 
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+    assertEquals(e, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
   }
 }


Reply via email to