Author: ctubbsii
Date: Tue Jan 15 23:58:38 2013
New Revision: 1433751

URL: http://svn.apache.org/viewvc?rev=1433751&view=rev
Log:
ACCUMULO-532 Update contrib to reflect changes in ACCUMULO-769 and to use the 
released versions of Hama BSP so we can close this ticket

Added:
    
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java
    
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java
    
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java
      - copied, changed from r1431766, 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
Removed:
    
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
Modified:
    accumulo/contrib/bsp/trunk/pom.xml
    
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
    
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
    
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java

Modified: accumulo/contrib/bsp/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/pom.xml?rev=1433751&r1=1433750&r2=1433751&view=diff
==============================================================================
--- accumulo/contrib/bsp/trunk/pom.xml (original)
+++ accumulo/contrib/bsp/trunk/pom.xml Tue Jan 15 23:58:38 2013
@@ -19,12 +19,113 @@
   <groupId>org.apache.accumulo</groupId>
   <artifactId>accumulo-bsp</artifactId>
   <version>1.5.0-SNAPSHOT</version>
-  
+
+  <parent>
+    <groupId>org.apache</groupId>
+    <artifactId>apache</artifactId>
+    <version>12</version>
+  </parent>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-remote-resources-plugin</artifactId>
+                    <versionRange>[1.0,)</versionRange>
+                    <goals>
+                      <goal>process</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore />
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-compiler-plugin</artifactId>
+          <configuration>
+            <source>1.6</source>
+            <target>1.6</target>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>integration-tests</id>
+      <activation>
+        <property>
+          <name>!skipTests</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-jar-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>create-integration-test-jar</id>
+                <phase>pre-integration-test</phase>
+                <goals>
+                  <goal>test-jar</goal>
+                </goals>
+                <configuration>
+                  <finalName>integration</finalName>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>run-integration-tests</id>
+                <phase>integration-test</phase>
+                <goals>
+                  <goal>integration-test</goal>
+                </goals>
+              </execution>
+              <execution>
+                <id>verify-integration-tests</id>
+                <phase>verify</phase>
+                <goals>
+                  <goal>verify</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.hama</groupId>
       <artifactId>hama-core</artifactId>
-      <version>0.4.0-incubating</version>
+      <version>0.6.0</version>
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
@@ -34,7 +135,13 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
-      <version>0.20.2</version>
+      <version>[1.0.0,2.0.0)</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
     </dependency>
   </dependencies>
 </project>

Modified: 
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java?rev=1433751&r1=1433750&r2=1433751&view=diff
==============================================================================
--- 
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
 (original)
+++ 
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
 Tue Jan 15 23:58:38 2013
@@ -27,23 +27,25 @@ import org.apache.hama.bsp.InputFormat;
 import org.apache.hama.bsp.InputSplit;
 import org.apache.hama.bsp.RecordReader;
 
+/**
+ * <p>
+ * AccumuloInputFormat class. To be used with Hama BSP.
+ * </p>
+ * 
+ * @see BSPJob#setInputFormat(Class)
+ */
 public class AccumuloInputFormat extends 
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat implements 
InputFormat<Key,Value> {
+  
   public class BSPRecordReaderBase extends RecordReaderBase<Key,Value> 
implements RecordReader<Key,Value> {
     public BSPRecordReaderBase(InputSplit split, BSPJob job) throws 
IOException {
-      this.initialize((BSPRangeInputSplit) split, job.getConf());
+      this.initialize((BSPRangeInputSplit) split, 
MapreduceWrapper.wrappedTaskAttemptContext(job));
     }
     
-    /*
-     * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
-     */
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
       return next(currentKey, currentValue);
     }
     
-    /*
-     * @see org.apache.hama.bsp.RecordReader#createKey()
-     */
     @Override
     public Key createKey() {
       if (currentKey == null) {
@@ -53,9 +55,6 @@ public class AccumuloInputFormat extends
       }
     }
     
-    /*
-     * @see org.apache.hama.bsp.RecordReader#createValue()
-     */
     @Override
     public Value createValue() {
       if (currentValue == null) {
@@ -65,17 +64,11 @@ public class AccumuloInputFormat extends
       }
     }
     
-    /*
-     * @see org.apache.hama.bsp.RecordReader#getPos()
-     */
     @Override
     public long getPos() throws IOException {
       return 0;
     }
     
-    /*
-     * @see org.apache.hama.bsp.RecordReader#next(java.lang.Object, 
java.lang.Object)
-     */
     @Override
     public boolean next(Key k, Value v) throws IOException {
       if (scannerIterator.hasNext()) {
@@ -108,7 +101,7 @@ public class AccumuloInputFormat extends
   
   @Override
   public InputSplit[] getSplits(BSPJob job, int arg1) throws IOException {
-    List<org.apache.hadoop.mapreduce.InputSplit> splits = 
getSplits(job.getConf());
+    List<org.apache.hadoop.mapreduce.InputSplit> splits = 
getSplits(MapreduceWrapper.wrappedTaskAttemptContext(job));
     InputSplit[] bspSplits = new BSPRangeInputSplit[splits.size()];
     for (int i = 0; i < splits.size(); i++) {
       bspSplits[i] = new BSPRangeInputSplit((RangeInputSplit) splits.get(i));

Modified: 
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java?rev=1433751&r1=1433750&r2=1433751&view=diff
==============================================================================
--- 
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
 (original)
+++ 
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
 Tue Jan 15 23:58:38 2013
@@ -21,27 +21,34 @@ import java.io.IOException;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.OutputFormat;
 import org.apache.hama.bsp.RecordWriter;
 
+/**
+ * <p>
+ * AccumuloOutputFormat class. To be used with Hama BSP.
+ * </p>
+ * 
+ * @see BSPJob#setOutputFormat(Class)
+ */
 public class AccumuloOutputFormat extends 
org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat implements 
OutputFormat<Text,Mutation> {
   
   protected static class BSPRecordWriter extends AccumuloRecordWriter 
implements RecordWriter<Text,Mutation> {
-    BSPRecordWriter(Configuration conf) throws AccumuloException, 
AccumuloSecurityException, IOException {
-      super(conf);
+    
+    private BSPJob job;
+    
+    BSPRecordWriter(BSPJob job) throws AccumuloException, 
AccumuloSecurityException, IOException {
+      super(MapreduceWrapper.wrappedTaskAttemptContext(job));
+      this.job = job;
     }
     
-    /*
-     * @see org.apache.hama.bsp.RecordWriter#close()
-     */
     @Override
     public void close() throws IOException {
       try {
-        close(null);
+        close(MapreduceWrapper.wrappedTaskAttemptContext(job));
       } catch (InterruptedException e) {
         throw new IOException(e);
       }
@@ -49,21 +56,15 @@ public class AccumuloOutputFormat extend
     
   }
   
-  /*
-   * @see 
org.apache.hama.bsp.OutputFormat#checkOutputSpecs(org.apache.hadoop.fs.FileSystem,
 org.apache.hama.bsp.BSPJob)
-   */
   @Override
   public void checkOutputSpecs(FileSystem fs, BSPJob job) throws IOException {
-    checkOutputSpecs(job.getConf());
+    checkOutputSpecs(MapreduceWrapper.wrappedTaskAttemptContext(job));
   }
   
-  /*
-   * @see 
org.apache.hama.bsp.OutputFormat#getRecordWriter(org.apache.hadoop.fs.FileSystem,
 org.apache.hama.bsp.BSPJob, java.lang.String)
-   */
   @Override
-  public RecordWriter<Text,Mutation> getRecordWriter(FileSystem fs, BSPJob 
job, String arg2) throws IOException {
+  public RecordWriter<Text,Mutation> getRecordWriter(FileSystem fs, BSPJob 
job, String name) throws IOException {
     try {
-      return new BSPRecordWriter(job.getConf());
+      return new BSPRecordWriter(job);
     } catch (Exception e) {
       throw new IOException(e);
     }

Added: 
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java
URL: 
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java?rev=1433751&view=auto
==============================================================================
--- 
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java
 (added)
+++ 
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java
 Tue Jan 15 23:58:38 2013
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.bsp;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * <p>
+ * MapreduceWrapper class. Provides a wrapper to wrap {@link BSPJob} into the 
appropriate Hadoop type required by {@link AccumuloInputFormat} and
+ * {@link AccumuloOutputFormat} static configurator methods. Useful for 
reusing code to set the job's configuration and not using the expected Hadoop 
API.
+ * </p>
+ */
+public class MapreduceWrapper {
+  
+  /**
+   * Wraps a {@link BSPJob} for reading its {@link Configuration} within 
Accumulo MapReduce classes' protected static configuration getters.
+   * 
+   * @param job
+   *          the {@link BSPJob} instance to be wrapped
+   * @return an instance of {@link TaskAttemptContext} whose {@link 
Configuration} is the same as the job
+   */
+  public static TaskAttemptContext wrappedTaskAttemptContext(final BSPJob job) 
{
+    return new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+  }
+  
+  /**
+   * Wraps a {@link BSPJob} for writing its {@link Configuration} within 
Accumulo MapReduce classes' public static configuration setters.
+   * 
+   * @param job
+   *          the {@link BSPJob} instance to be wrapped
+   * @return an instance of {@link Job} that exposes {@link 
BSPJob#getConfiguration()} via {@link Job#getConfiguration()}; no other methods 
of {@link Job} are
+   *         implemented, so this object cannot be used for anything other 
than editing the {@link BSPJob}'s {@link Configuration}
+   */
+  public static Job wrappedJob(BSPJob job) {
+    final BSPJob bspJob = job;
+    try {
+      return new Job() {
+        @Override
+        public Configuration getConfiguration() {
+          return bspJob.getConfiguration();
+        }
+      };
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

Added: 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java
URL: 
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java?rev=1433751&view=auto
==============================================================================
--- 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java
 (added)
+++ 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java
 Tue Jan 15 23:58:38 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.accumulo.bsp.AccumuloInputFormat;
+import org.apache.accumulo.bsp.MapreduceWrapper;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.InputSplit;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class AccumuloInputFormatIT {
+  
+  static class InputFormatTestBSP<M extends Writable> extends 
BSP<Key,Value,Key,Value,M> {
+    Key key = null;
+    int count = 0;
+    
+    @Override
+    public void bsp(BSPPeer<Key,Value,Key,Value,M> peer) throws IOException, 
SyncException, InterruptedException {
+      // this method reads the next key value record from file
+      KeyValuePair<Key,Value> pair;
+      
+      while ((pair = peer.readNext()) != null) {
+        if (key != null) {
+          assertEquals(key.getRow().toString(), new 
String(pair.getValue().get()));
+        }
+        
+        assertEquals(pair.getKey().getRow(), new Text(String.format("%09x", 
count + 1)));
+        assertEquals(new String(pair.getValue().get()), String.format("%09x", 
count));
+        count++;
+        
+        key = new Key(pair.getKey());
+      }
+      
+      peer.sync();
+      assertEquals(100, count);
+    }
+  }
+  
+  @Test
+  public void testBSPInputFormat() throws Exception {
+    MockInstance mockInstance = new MockInstance("testmapinstance");
+    Connector c = mockInstance.getConnector("root", new byte[] {});
+    if (c.tableOperations().exists("testtable"))
+      c.tableOperations().delete("testtable");
+    c.tableOperations().create("testtable");
+    
+    BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", 
i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+    
+    BSPJob bspJob = new BSPJob();
+    Job job = MapreduceWrapper.wrappedJob(bspJob);
+    
+    bspJob.setInputFormat(AccumuloInputFormat.class);
+    bspJob.setBspClass(InputFormatTestBSP.class);
+    bspJob.setInputPath(new Path("test"));
+    
+    AccumuloInputFormat.setInputInfo(job, "root", "".getBytes(), "testtable", 
new Authorizations());
+    AccumuloInputFormat.setMockInstance(job, "testmapinstance");
+    
+    AccumuloInputFormat input = new AccumuloInputFormat();
+    InputSplit[] splits = input.getSplits(bspJob, 0);
+    assertEquals(splits.length, 1);
+    
+    bspJob.setJar("target/integration-tests.jar");
+    bspJob.setOutputPath(new Path("target/bsp-inputformat-test"));
+    if (!bspJob.waitForCompletion(false))
+      fail("Job not finished successfully");
+  }
+  
+}

Modified: 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1433751&r1=1433750&r2=1433751&view=diff
==============================================================================
--- 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
 (original)
+++ 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
 Tue Jan 15 23:58:38 2013
@@ -21,243 +21,175 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.accumulo.bsp.AccumuloInputFormat;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.bsp.MapreduceWrapper;
 import org.apache.accumulo.core.client.IteratorSetting;
-import 
org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator;
-import 
org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.RegExFilter;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSP;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.InputSplit;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.util.KeyValuePair;
-import org.junit.After;
 import org.junit.Test;
 
+/**
+ * 
+ */
 public class AccumuloInputFormatTest {
   
-  @After
-  public void tearDown() throws Exception {}
-  
-  /**
-   * Test basic setting & getting of max versions.
-   * 
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  @Test
-  public void testMaxVersions() throws IOException {
-    BSPJob job = new BSPJob();
-    AccumuloInputFormat.setMaxVersions(job.getConf(), 1);
-    int version = AccumuloInputFormat.getMaxVersions(job.getConf());
-    assertEquals(1, version);
-  }
-  
-  @Test(expected = IOException.class)
-  public void testMaxVersionsLessThan1() throws IOException {
-    BSPJob job = new BSPJob();
-    AccumuloInputFormat.setMaxVersions(job.getConf(), 0);
-  }
-  
-  @Test
-  public void testNoMaxVersion() throws IOException {
-    BSPJob job = new BSPJob();
-    assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConf()));
-  }
-  
   @Test
   public void testSetIterator() throws IOException {
-    BSPJob job = new BSPJob();
+    BSPJob bspJob = new BSPJob();
     
-    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1, 
"WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
-    Configuration conf = job.getConf();
-    String iterators = conf.get("AccumuloInputFormat.iterators");
-    
assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", 
iterators);
+    Job job = MapreduceWrapper.wrappedJob(bspJob);
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", 
"org.apache.accumulo.core.iterators.WholeRowIterator"));
+    
+    TaskAttemptContext context = 
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+    List<IteratorSetting> iterators = 
AccumuloInputFormat.getIterators(context);
+    assertEquals(1, iterators.size());
+    IteratorSetting iter = iterators.get(0);
+    assertEquals(1, iter.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", 
iter.getIteratorClass());
+    assertEquals("WholeRow", iter.getName());
+    assertEquals(0, iter.getOptions().size());
   }
   
   @Test
   public void testAddIterator() throws IOException {
-    BSPJob job = new BSPJob();
+    BSPJob bspJob = new BSPJob();
     
-    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1, 
"WholeRow", WholeRowIterator.class));
-    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(2, 
"Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+    Job job = MapreduceWrapper.wrappedJob(bspJob);
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", 
WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", 
"org.apache.accumulo.core.iterators.VersioningIterator"));
     IteratorSetting iter = new IteratorSetting(3, "Count", 
"org.apache.accumulo.core.iterators.CountingIterator");
     iter.addOption("v1", "1");
     iter.addOption("junk", "\0omg:!\\xyzzy");
-    AccumuloInputFormat.addIterator(job.getConf(), iter);
+    AccumuloInputFormat.addIterator(job, iter);
     
-    List<AccumuloIterator> list = 
AccumuloInputFormat.getIterators(job.getConf());
+    TaskAttemptContext context = 
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+    List<IteratorSetting> list = AccumuloInputFormat.getIterators(context);
     
     // Check the list size
     assertTrue(list.size() == 3);
     
     // Walk the list and make sure our settings are correct
-    AccumuloIterator setting = list.get(0);
+    IteratorSetting setting = list.get(0);
     assertEquals(1, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", 
setting.getIteratorClass());
-    assertEquals("WholeRow", setting.getIteratorName());
+    assertEquals("WholeRow", setting.getName());
     
     setting = list.get(1);
     assertEquals(2, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", 
setting.getIteratorClass());
-    assertEquals("Versions", setting.getIteratorName());
+    assertEquals("Versions", setting.getName());
     
     setting = list.get(2);
     assertEquals(3, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.CountingIterator", 
setting.getIteratorClass());
-    assertEquals("Count", setting.getIteratorName());
+    assertEquals("Count", setting.getName());
     
-    List<AccumuloIteratorOption> iteratorOptions = 
AccumuloInputFormat.getIteratorOptions(job.getConf());
+    Map<String,String> iteratorOptions = setting.getOptions();
     assertEquals(2, iteratorOptions.size());
-    assertEquals("Count", iteratorOptions.get(0).getIteratorName());
-    assertEquals("Count", iteratorOptions.get(1).getIteratorName());
-    assertEquals("v1", iteratorOptions.get(0).getKey());
-    assertEquals("1", iteratorOptions.get(0).getValue());
-    assertEquals("junk", iteratorOptions.get(1).getKey());
-    assertEquals("\0omg:!\\xyzzy", iteratorOptions.get(1).getValue());
+    assertTrue(iteratorOptions.containsKey("v1"));
+    assertEquals("1", iteratorOptions.get("v1"));
+    assertTrue(iteratorOptions.containsKey("junk"));
+    assertEquals("\0omg:!\\xyzzy", iteratorOptions.get("junk"));
   }
   
   @Test
-  public void testIteratorOptionEncoding() throws Throwable {
+  public void testIteratorOptionEncoding() throws IOException {
+    BSPJob bspJob = new BSPJob();
     String key = "colon:delimited:key";
     String value = "comma,delimited,value";
+    
+    Job job = MapreduceWrapper.wrappedJob(bspJob);
     IteratorSetting someSetting = new IteratorSetting(1, "iterator", 
"Iterator.class");
     someSetting.addOption(key, value);
-    BSPJob job = new BSPJob();
-    AccumuloInputFormat.addIterator(job.getConf(), someSetting);
+    AccumuloInputFormat.addIterator(job, someSetting);
     
-    final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, 
value).toString();
-    
-    assertEquals(rawConfigOpt, 
job.getConf().get("AccumuloInputFormat.iterators.options"));
-    
-    List<AccumuloIteratorOption> opts = 
AccumuloInputFormat.getIteratorOptions(job.getConf());
+    TaskAttemptContext context = 
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+    List<IteratorSetting> iters = AccumuloInputFormat.getIterators(context);
+    assertEquals(1, iters.size());
+    assertEquals("iterator", iters.get(0).getName());
+    assertEquals("Iterator.class", iters.get(0).getIteratorClass());
+    assertEquals(1, iters.get(0).getPriority());
+    Map<String,String> opts = iters.get(0).getOptions();
     assertEquals(1, opts.size());
-    assertEquals(opts.get(0).getKey(), key);
-    assertEquals(opts.get(0).getValue(), value);
+    assertTrue(opts.containsKey(key));
+    assertEquals(value, opts.get(key));
     
     someSetting.addOption(key + "2", value);
     someSetting.setPriority(2);
     someSetting.setName("it2");
-    AccumuloInputFormat.addIterator(job.getConf(), someSetting);
-    opts = AccumuloInputFormat.getIteratorOptions(job.getConf());
-    assertEquals(3, opts.size());
-    for (AccumuloIteratorOption opt : opts) {
-      assertEquals(opt.getKey().substring(0, key.length()), key);
-      assertEquals(opt.getValue(), value);
-    }
+    AccumuloInputFormat.addIterator(job, someSetting);
+    
+    context = MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+    iters = AccumuloInputFormat.getIterators(context);
+    assertEquals(2, iters.size());
+    assertEquals("iterator", iters.get(0).getName());
+    assertEquals("Iterator.class", iters.get(0).getIteratorClass());
+    assertEquals(1, iters.get(0).getPriority());
+    opts = iters.get(0).getOptions();
+    assertEquals(1, opts.size());
+    assertTrue(opts.containsKey(key));
+    assertEquals(value, opts.get(key));
+    assertEquals("it2", iters.get(1).getName());
+    assertEquals("Iterator.class", iters.get(1).getIteratorClass());
+    assertEquals(2, iters.get(1).getPriority());
+    opts = iters.get(1).getOptions();
+    assertEquals(2, opts.size());
+    assertTrue(opts.containsKey(key));
+    assertEquals(value, opts.get(key));
+    assertTrue(opts.containsKey(key + "2"));
+    assertEquals(value, opts.get(key + "2"));
   }
   
   @Test
   public void testGetIteratorSettings() throws IOException {
-    BSPJob job = new BSPJob();
+    BSPJob bspJob = new BSPJob();
+    Job job = MapreduceWrapper.wrappedJob(bspJob);
     
-    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1, 
"WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
-    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(2, 
"Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
-    AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(3, 
"Count", "org.apache.accumulo.core.iterators.CountingIterator"));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", 
"org.apache.accumulo.core.iterators.WholeRowIterator"));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", 
"org.apache.accumulo.core.iterators.VersioningIterator"));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", 
"org.apache.accumulo.core.iterators.CountingIterator"));
     
-    List<AccumuloIterator> list = 
AccumuloInputFormat.getIterators(job.getConf());
+    TaskAttemptContext context = 
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+    List<IteratorSetting> list = AccumuloInputFormat.getIterators(context);
     
     // Check the list size
-    assertTrue(list.size() == 3);
+    assertEquals(3, list.size());
     
     // Walk the list and make sure our settings are correct
-    AccumuloIterator setting = list.get(0);
+    IteratorSetting setting = list.get(0);
     assertEquals(1, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", 
setting.getIteratorClass());
-    assertEquals("WholeRow", setting.getIteratorName());
+    assertEquals("WholeRow", setting.getName());
     
     setting = list.get(1);
     assertEquals(2, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", 
setting.getIteratorClass());
-    assertEquals("Versions", setting.getIteratorName());
+    assertEquals("Versions", setting.getName());
     
     setting = list.get(2);
     assertEquals(3, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.CountingIterator", 
setting.getIteratorClass());
-    assertEquals("Count", setting.getIteratorName());
-    
+    assertEquals("Count", setting.getName());
   }
   
   @Test
   public void testSetRegex() throws IOException {
-    BSPJob job = new BSPJob();
+    BSPJob bspJob = new BSPJob();
+    Job job = MapreduceWrapper.wrappedJob(bspJob);
     
     String regex = ">\"*%<>\'\\";
     
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
     RegExFilter.setRegexs(is, regex, null, null, null, false);
-    AccumuloInputFormat.addIterator(job.getConf(), is);
+    AccumuloInputFormat.addIterator(job, is);
     
-    
assertTrue(regex.equals(AccumuloInputFormat.getIterators(job.getConf()).get(0).getIteratorName()));
+    TaskAttemptContext context = 
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+    assertEquals(regex, 
AccumuloInputFormat.getIterators(context).get(0).getName());
   }
   
-  static class TestBSP extends BSP<Key,Value,Key,Value> {
-    Key key = null;
-    int count = 0;
-    
-    @Override
-    public void bsp(BSPPeer<Key,Value,Key,Value> peer) throws IOException, 
SyncException, InterruptedException {
-      // this method reads the next key value record from file
-      KeyValuePair<Key,Value> pair;
-      
-      while ((pair = peer.readNext()) != null) {
-        if (key != null) {
-          assertEquals(key.getRow().toString(), new 
String(pair.getValue().get()));
-        }
-        
-        assertEquals(pair.getKey().getRow(), new Text(String.format("%09x", 
count + 1)));
-        assertEquals(new String(pair.getValue().get()), String.format("%09x", 
count));
-        count++;
-        
-        key = new Key(pair.getKey());
-      }
-      
-      peer.sync();
-      assertEquals(100, count);
-    }
-  }
-  
-  @Test
-  public void testBsp() throws Exception {
-    MockInstance mockInstance = new MockInstance("testmapinstance");
-    Connector c = mockInstance.getConnector("root", new byte[] {});
-    if (c.tableOperations().exists("testtable"))
-      c.tableOperations().delete("testtable");
-    c.tableOperations().create("testtable");
-    
-    BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4);
-    for (int i = 0; i < 100; i++) {
-      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
-      m.put(new Text(), new Text(), new Value(String.format("%09x", 
i).getBytes()));
-      bw.addMutation(m);
-    }
-    bw.close();
-    
-    BSPJob job = new BSPJob(new HamaConfiguration());
-    job.setInputFormat(AccumuloInputFormat.class);
-    job.setBspClass(TestBSP.class);
-    job.setInputPath(new Path("test"));
-    AccumuloInputFormat.setInputInfo(job.getConf(), "root", "".getBytes(), 
"testtable", new Authorizations());
-    AccumuloInputFormat.setMockInstance(job.getConf(), "testmapinstance");
-    
-    AccumuloInputFormat input = new AccumuloInputFormat();
-    InputSplit[] splits = input.getSplits(job, 0);
-    assertEquals(splits.length, 1);
-    
-    job.waitForCompletion(false);
-  }
 }

Copied: 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java
 (from r1431766, 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java)
URL: 
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java?p2=accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java&p1=accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java&r1=1431766&r2=1433751&rev=1433751&view=diff
==============================================================================
--- 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
 (original)
+++ 
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java
 Tue Jan 15 23:58:38 2013
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.client.
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -26,7 +27,9 @@ import java.util.Map.Entry;
 
 import org.apache.accumulo.bsp.AccumuloInputFormat;
 import org.apache.accumulo.bsp.AccumuloOutputFormat;
+import org.apache.accumulo.bsp.MapreduceWrapper;
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.mock.MockInstance;
@@ -37,6 +40,8 @@ import org.apache.accumulo.core.security
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
@@ -46,14 +51,17 @@ import org.apache.hama.bsp.sync.SyncExce
 import org.apache.hama.util.KeyValuePair;
 import org.junit.Test;
 
-public class AccumuloOutputFormatTest {
+/**
+ * 
+ */
+public class AccumuloOutputFormatIT {
   
-  static class TestBSP extends BSP<Key,Value,Text,Mutation> {
+  static class OutputFormatTestBSP<M extends Writable> extends 
BSP<Key,Value,Text,Mutation,M> {
     Key key = null;
     int count = 0;
     
     @Override
-    public void bsp(BSPPeer<Key,Value,Text,Mutation> peer) throws IOException, 
SyncException, InterruptedException {
+    public void bsp(BSPPeer<Key,Value,Text,Mutation,M> peer) throws 
IOException, SyncException, InterruptedException {
       // this method reads the next key value record from file
       KeyValuePair<Key,Value> pair;
       
@@ -73,7 +81,7 @@ public class AccumuloOutputFormatTest {
     }
     
     @Override
-    public void cleanup(BSPPeer<Key,Value,Text,Mutation> peer) throws 
IOException {
+    public void cleanup(BSPPeer<Key,Value,Text,Mutation,M> peer) throws 
IOException {
       Mutation m = new Mutation("total");
       m.put("", "", Integer.toString(count));
       peer.write(new Text("testtable2"), m);
@@ -81,7 +89,7 @@ public class AccumuloOutputFormatTest {
   }
   
   @Test
-  public void testBSP() throws Exception {
+  public void testBSPOutputFormat() throws Exception {
     MockInstance mockInstance = new MockInstance("testmrinstance");
     Connector c = mockInstance.getConnector("root", new byte[] {});
     if (c.tableOperations().exists("testtable1"))
@@ -91,7 +99,7 @@ public class AccumuloOutputFormatTest {
     
     c.tableOperations().create("testtable1");
     c.tableOperations().create("testtable2");
-    BatchWriter bw = c.createBatchWriter("testtable1", 10000L, 1000L, 4);
+    BatchWriter bw = c.createBatchWriter("testtable1", new 
BatchWriterConfig());
     for (int i = 0; i < 100; i++) {
       Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
       m.put(new Text(), new Text(), new Value(String.format("%09x", 
i).getBytes()));
@@ -100,29 +108,33 @@ public class AccumuloOutputFormatTest {
     bw.close();
     
     Configuration conf = new Configuration();
-    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
-    bsp.setJobName("Test Input Output");
+    BSPJob bspJob = new BSPJob(new HamaConfiguration(conf));
+    bspJob.setJobName("Test Input Output");
     
-    bsp.setBspClass(TestBSP.class);
-    bsp.setInputFormat(AccumuloInputFormat.class);
-    bsp.setInputPath(new Path("test"));
-    
-    bsp.setOutputFormat(AccumuloOutputFormat.class);
-    bsp.setOutputPath(new Path("test"));
-    
-    bsp.setOutputKeyClass(Text.class);
-    bsp.setOutputValueClass(Mutation.class);
-    
-    AccumuloInputFormat.setInputInfo(bsp.getConf(), "root", "".getBytes(), 
"testtable1", new Authorizations());
-    AccumuloInputFormat.setMockInstance(bsp.getConf(), "testmrinstance");
-    AccumuloOutputFormat.setOutputInfo(bsp.getConf(), "root", "".getBytes(), 
false, "testtable2");
-    AccumuloOutputFormat.setMockInstance(bsp.getConf(), "testmrinstance");
+    bspJob.setBspClass(OutputFormatTestBSP.class);
+    bspJob.setInputFormat(AccumuloInputFormat.class);
+    bspJob.setInputPath(new Path("test"));
+    
+    bspJob.setOutputFormat(AccumuloOutputFormat.class);
+    bspJob.setJar("target/integration-tests.jar");
+    bspJob.setOutputPath(new Path("target/bsp-outputformat-test"));
+    
+    bspJob.setOutputKeyClass(Text.class);
+    bspJob.setOutputValueClass(Mutation.class);
+    
+    Job job = MapreduceWrapper.wrappedJob(bspJob);
+    
+    AccumuloInputFormat.setInputInfo(job, "root", "".getBytes(), "testtable1", 
new Authorizations());
+    AccumuloInputFormat.setMockInstance(job, "testmrinstance");
+    AccumuloOutputFormat.setOutputInfo(job, "root", "".getBytes(), false, 
"testtable2");
+    AccumuloOutputFormat.setMockInstance(job, "testmrinstance");
     
     AccumuloInputFormat input = new AccumuloInputFormat();
-    InputSplit[] splits = input.getSplits(bsp, 0);
+    InputSplit[] splits = input.getSplits(bspJob, 0);
     assertEquals(splits.length, 1);
     
-    bsp.waitForCompletion(false);
+    if (!bspJob.waitForCompletion(false))
+      fail("Job not finished successfully");
     
     Scanner scanner = c.createScanner("testtable2", new Authorizations());
     Iterator<Entry<Key,Value>> iter = scanner.iterator();
@@ -131,6 +143,6 @@ public class AccumuloOutputFormatTest {
     assertEquals("total", entry.getKey().getRow().toString());
     assertEquals(100, Integer.parseInt(new String(entry.getValue().get())));
     assertFalse(iter.hasNext());
-    
   }
+  
 }


Reply via email to