Author: ctubbsii Date: Tue Jan 15 23:58:38 2013 New Revision: 1433751 URL: http://svn.apache.org/viewvc?rev=1433751&view=rev Log: ACCUMULO-532 Update contrib to reflect changes in ACCUMULO-769 and to use the released versions of Hama BSP so we can close this ticket
Added: accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java - copied, changed from r1431766, accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java Removed: accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java Modified: accumulo/contrib/bsp/trunk/pom.xml accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Modified: accumulo/contrib/bsp/trunk/pom.xml URL: http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/pom.xml?rev=1433751&r1=1433750&r2=1433751&view=diff ============================================================================== --- accumulo/contrib/bsp/trunk/pom.xml (original) +++ accumulo/contrib/bsp/trunk/pom.xml Tue Jan 15 23:58:38 2013 @@ -19,12 +19,113 @@ <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-bsp</artifactId> <version>1.5.0-SNAPSHOT</version> - + + <parent> + <groupId>org.apache</groupId> + <artifactId>apache</artifactId> + <version>12</version> + </parent> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-remote-resources-plugin</artifactId> + <versionRange>[1.0,)</versionRange> + <goals> + <goal>process</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore /> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + + <profiles> + <profile> + <id>integration-tests</id> + <activation> + <property> + <name>!skipTests</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <id>create-integration-test-jar</id> + <phase>pre-integration-test</phase> + <goals> + <goal>test-jar</goal> + </goals> + <configuration> + <finalName>integration</finalName> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <id>run-integration-tests</id> + <phase>integration-test</phase> + <goals> + <goal>integration-test</goal> + </goals> + </execution> + <execution> + <id>verify-integration-tests</id> + <phase>verify</phase> + <goals> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + <dependencies> <dependency> <groupId>org.apache.hama</groupId> <artifactId>hama-core</artifactId> - <version>0.4.0-incubating</version> + <version>0.6.0</version> </dependency> <dependency> <groupId>org.apache.accumulo</groupId> @@ -34,7 +135,13 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> - <version>0.20.2</version> + <version>[1.0.0,2.0.0)</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> </dependency> </dependencies> </project> Modified: accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java?rev=1433751&r1=1433750&r2=1433751&view=diff ============================================================================== --- accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java (original) +++ accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java Tue Jan 15 23:58:38 2013 @@ -27,23 +27,25 @@ import org.apache.hama.bsp.InputFormat; import org.apache.hama.bsp.InputSplit; import org.apache.hama.bsp.RecordReader; +/** + * <p> + * AccumuloInputFormat class. To be used with Hama BSP. + * </p> + * + * @see BSPJob#setInputFormat(Class) + */ public class AccumuloInputFormat extends org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat implements InputFormat<Key,Value> { + public class BSPRecordReaderBase extends RecordReaderBase<Key,Value> implements RecordReader<Key,Value> { public BSPRecordReaderBase(InputSplit split, BSPJob job) throws IOException { - this.initialize((BSPRangeInputSplit) split, job.getConf()); + this.initialize((BSPRangeInputSplit) split, MapreduceWrapper.wrappedTaskAttemptContext(job)); } - /* - * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue() - */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { return next(currentKey, currentValue); } - /* - * @see org.apache.hama.bsp.RecordReader#createKey() - */ @Override public Key createKey() { if (currentKey == null) { @@ -53,9 +55,6 @@ public class AccumuloInputFormat extends } } - /* - * @see org.apache.hama.bsp.RecordReader#createValue() - */ @Override public Value createValue() { if (currentValue == null) { @@ -65,17 +64,11 @@ public class AccumuloInputFormat extends } } - /* - * @see org.apache.hama.bsp.RecordReader#getPos() - */ @Override public long getPos() throws IOException { return 0; } - /* - * @see org.apache.hama.bsp.RecordReader#next(java.lang.Object, java.lang.Object) - */ @Override public boolean next(Key k, Value v) throws IOException { if (scannerIterator.hasNext()) { @@ -108,7 +101,7 @@ public class AccumuloInputFormat extends @Override public InputSplit[] getSplits(BSPJob job, int arg1) throws IOException { - List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(job.getConf()); + List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(MapreduceWrapper.wrappedTaskAttemptContext(job)); InputSplit[] bspSplits = new BSPRangeInputSplit[splits.size()]; for (int i = 0; i < splits.size(); i++) { bspSplits[i] = new BSPRangeInputSplit((RangeInputSplit) splits.get(i)); Modified: accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java?rev=1433751&r1=1433750&r2=1433751&view=diff ============================================================================== --- accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java (original) +++ accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java Tue Jan 15 23:58:38 2013 @@ -21,27 +21,34 @@ import java.io.IOException; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.data.Mutation; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.OutputFormat; import org.apache.hama.bsp.RecordWriter; +/** + * <p> + * AccumuloOutputFormat class. To be used with Hama BSP. + * </p> + * + * @see BSPJob#setOutputFormat(Class) + */ public class AccumuloOutputFormat extends org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat implements OutputFormat<Text,Mutation> { protected static class BSPRecordWriter extends AccumuloRecordWriter implements RecordWriter<Text,Mutation> { - BSPRecordWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException, IOException { - super(conf); + + private BSPJob job; + + BSPRecordWriter(BSPJob job) throws AccumuloException, AccumuloSecurityException, IOException { + super(MapreduceWrapper.wrappedTaskAttemptContext(job)); + this.job = job; } - /* - * @see org.apache.hama.bsp.RecordWriter#close() - */ @Override public void close() throws IOException { try { - close(null); + close(MapreduceWrapper.wrappedTaskAttemptContext(job)); } catch (InterruptedException e) { throw new IOException(e); } @@ -49,21 +56,15 @@ public class AccumuloOutputFormat extend } - /* - * @see org.apache.hama.bsp.OutputFormat#checkOutputSpecs(org.apache.hadoop.fs.FileSystem, org.apache.hama.bsp.BSPJob) - */ @Override public void checkOutputSpecs(FileSystem fs, BSPJob job) throws IOException { - checkOutputSpecs(job.getConf()); + checkOutputSpecs(MapreduceWrapper.wrappedTaskAttemptContext(job)); } - /* - * @see org.apache.hama.bsp.OutputFormat#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hama.bsp.BSPJob, java.lang.String) - */ @Override - public RecordWriter<Text,Mutation> getRecordWriter(FileSystem fs, BSPJob job, String arg2) throws IOException { + public RecordWriter<Text,Mutation> getRecordWriter(FileSystem fs, BSPJob job, String name) throws IOException { try { - return new BSPRecordWriter(job.getConf()); + return new BSPRecordWriter(job); } catch (Exception e) { throw new IOException(e); } Added: accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java URL: http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java?rev=1433751&view=auto ============================================================================== --- accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java (added) +++ accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java Tue Jan 15 23:58:38 2013 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.bsp; + +import java.io.IOException; + +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hama.bsp.BSPJob; + +/** + * <p> + * MapreduceWrapper class. Provides a wrapper to wrap {@link BSPJob} into the appropriate Hadoop type required by {@link AccumuloInputFormat} and + * {@link AccumuloOutputFormat} static configurator methods. Useful for reusing code to set the job's configuration and not using the expected Hadoop API. + * </p> + */ +public class MapreduceWrapper { + + /** + * Wraps a {@link BSPJob} for reading its {@link Configuration} within Accumulo MapReduce classes' protected static configuration getters. + * + * @param job + * the {@link BSPJob} instance to be wrapped + * @return an instance of {@link TaskAttemptContext} whose {@link Configuration} is the same as the job + */ + public static TaskAttemptContext wrappedTaskAttemptContext(final BSPJob job) { + return new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + } + + /** + * Wraps a {@link BSPJob} for writing its {@link Configuration} within Accumulo MapReduce classes' public static configuration setters. + * + * @param job + * the {@link BSPJob} instance to be wrapped + * @return an instance of {@link Job} that exposes {@link BSPJob#getConfiguration()} via {@link Job#getConfiguration()}; no other methods of {@link Job} are + * implemented, so this object cannot be used for anything other than editing the {@link BSPJob}'s {@link Configuration} + */ + public static Job wrappedJob(BSPJob job) { + final BSPJob bspJob = job; + try { + return new Job() { + @Override + public Configuration getConfiguration() { + return bspJob.getConfiguration(); + } + }; + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} Added: accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java URL: http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java?rev=1433751&view=auto ============================================================================== --- accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java (added) +++ accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java Tue Jan 15 23:58:38 2013 @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.accumulo.bsp.AccumuloInputFormat; +import org.apache.accumulo.bsp.MapreduceWrapper; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.InputSplit; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.util.KeyValuePair; +import org.junit.Test; + +/** + * + */ +public class AccumuloInputFormatIT { + + static class InputFormatTestBSP<M extends Writable> extends BSP<Key,Value,Key,Value,M> { + Key key = null; + int count = 0; + + @Override + public void bsp(BSPPeer<Key,Value,Key,Value,M> peer) throws IOException, SyncException, InterruptedException { + // this method reads the next key value record from file + KeyValuePair<Key,Value> pair; + + while ((pair = peer.readNext()) != null) { + if (key != null) { + assertEquals(key.getRow().toString(), new String(pair.getValue().get())); + } + + assertEquals(pair.getKey().getRow(), new Text(String.format("%09x", count + 1))); + assertEquals(new String(pair.getValue().get()), String.format("%09x", count)); + count++; + + key = new Key(pair.getKey()); + } + + peer.sync(); + assertEquals(100, count); + } + } + + @Test + public void testBSPInputFormat() throws Exception { + MockInstance mockInstance = new MockInstance("testmapinstance"); + Connector c = mockInstance.getConnector("root", new byte[] {}); + if (c.tableOperations().exists("testtable")) + c.tableOperations().delete("testtable"); + c.tableOperations().create("testtable"); + + BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + BSPJob bspJob = new BSPJob(); + Job job = MapreduceWrapper.wrappedJob(bspJob); + + bspJob.setInputFormat(AccumuloInputFormat.class); + bspJob.setBspClass(InputFormatTestBSP.class); + bspJob.setInputPath(new Path("test")); + + AccumuloInputFormat.setInputInfo(job, "root", "".getBytes(), "testtable", new Authorizations()); + AccumuloInputFormat.setMockInstance(job, "testmapinstance"); + + AccumuloInputFormat input = new AccumuloInputFormat(); + InputSplit[] splits = input.getSplits(bspJob, 0); + assertEquals(splits.length, 1); + + bspJob.setJar("target/integration-tests.jar"); + bspJob.setOutputPath(new Path("target/bsp-inputformat-test")); + if (!bspJob.waitForCompletion(false)) + fail("Job not finished successfully"); + } + +} Modified: accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java URL: http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1433751&r1=1433750&r2=1433751&view=diff ============================================================================== --- accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java (original) +++ accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Tue Jan 15 23:58:38 2013 @@ -21,243 +21,175 @@ import static org.junit.Assert.assertTru import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.accumulo.bsp.AccumuloInputFormat; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.bsp.MapreduceWrapper; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.RegExFilter; import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.BSP; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.bsp.InputSplit; -import org.apache.hama.bsp.sync.SyncException; -import org.apache.hama.util.KeyValuePair; -import org.junit.After; import org.junit.Test; +/** + * + */ public class AccumuloInputFormatTest { - @After - public void tearDown() throws Exception {} - - /** - * Test basic setting & getting of max versions. - * - * @throws IOException - * Signals that an I/O exception has occurred. - */ - @Test - public void testMaxVersions() throws IOException { - BSPJob job = new BSPJob(); - AccumuloInputFormat.setMaxVersions(job.getConf(), 1); - int version = AccumuloInputFormat.getMaxVersions(job.getConf()); - assertEquals(1, version); - } - - @Test(expected = IOException.class) - public void testMaxVersionsLessThan1() throws IOException { - BSPJob job = new BSPJob(); - AccumuloInputFormat.setMaxVersions(job.getConf(), 0); - } - - @Test - public void testNoMaxVersion() throws IOException { - BSPJob job = new BSPJob(); - assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConf())); - } - @Test public void testSetIterator() throws IOException { - BSPJob job = new BSPJob(); + BSPJob bspJob = new BSPJob(); - AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); - Configuration conf = job.getConf(); - String iterators = conf.get("AccumuloInputFormat.iterators"); - assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators); + Job job = MapreduceWrapper.wrappedJob(bspJob); + AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); + + TaskAttemptContext context = MapreduceWrapper.wrappedTaskAttemptContext(bspJob); + List<IteratorSetting> iterators = AccumuloInputFormat.getIterators(context); + assertEquals(1, iterators.size()); + IteratorSetting iter = iterators.get(0); + assertEquals(1, iter.getPriority()); + assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", iter.getIteratorClass()); + assertEquals("WholeRow", iter.getName()); + assertEquals(0, iter.getOptions().size()); } @Test public void testAddIterator() throws IOException { - BSPJob job = new BSPJob(); + BSPJob bspJob = new BSPJob(); - AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); - AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); + Job job = MapreduceWrapper.wrappedJob(bspJob); + AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); + AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"); iter.addOption("v1", "1"); iter.addOption("junk", "\0omg:!\\xyzzy"); - AccumuloInputFormat.addIterator(job.getConf(), iter); + AccumuloInputFormat.addIterator(job, iter); - List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job.getConf()); + TaskAttemptContext context = MapreduceWrapper.wrappedTaskAttemptContext(bspJob); + List<IteratorSetting> list = AccumuloInputFormat.getIterators(context); // Check the list size assertTrue(list.size() == 3); // Walk the list and make sure our settings are correct - AccumuloIterator setting = list.get(0); + IteratorSetting setting = list.get(0); assertEquals(1, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass()); - assertEquals("WholeRow", setting.getIteratorName()); + assertEquals("WholeRow", setting.getName()); setting = list.get(1); assertEquals(2, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); - assertEquals("Versions", setting.getIteratorName()); + assertEquals("Versions", setting.getName()); setting = list.get(2); assertEquals(3, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); - assertEquals("Count", setting.getIteratorName()); + assertEquals("Count", setting.getName()); - List<AccumuloIteratorOption> iteratorOptions = AccumuloInputFormat.getIteratorOptions(job.getConf()); + Map<String,String> iteratorOptions = setting.getOptions(); assertEquals(2, iteratorOptions.size()); - assertEquals("Count", iteratorOptions.get(0).getIteratorName()); - assertEquals("Count", iteratorOptions.get(1).getIteratorName()); - assertEquals("v1", iteratorOptions.get(0).getKey()); - assertEquals("1", iteratorOptions.get(0).getValue()); - assertEquals("junk", iteratorOptions.get(1).getKey()); - assertEquals("\0omg:!\\xyzzy", iteratorOptions.get(1).getValue()); + assertTrue(iteratorOptions.containsKey("v1")); + assertEquals("1", iteratorOptions.get("v1")); + assertTrue(iteratorOptions.containsKey("junk")); + assertEquals("\0omg:!\\xyzzy", iteratorOptions.get("junk")); } @Test - public void testIteratorOptionEncoding() throws Throwable { + public void testIteratorOptionEncoding() throws IOException { + BSPJob bspJob = new BSPJob(); String key = "colon:delimited:key"; String value = "comma,delimited,value"; + + Job job = MapreduceWrapper.wrappedJob(bspJob); IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class"); someSetting.addOption(key, value); - BSPJob job = new BSPJob(); - AccumuloInputFormat.addIterator(job.getConf(), someSetting); + AccumuloInputFormat.addIterator(job, someSetting); - final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString(); - - assertEquals(rawConfigOpt, job.getConf().get("AccumuloInputFormat.iterators.options")); - - List<AccumuloIteratorOption> opts = AccumuloInputFormat.getIteratorOptions(job.getConf()); + TaskAttemptContext context = MapreduceWrapper.wrappedTaskAttemptContext(bspJob); + List<IteratorSetting> iters = AccumuloInputFormat.getIterators(context); + assertEquals(1, iters.size()); + assertEquals("iterator", iters.get(0).getName()); + assertEquals("Iterator.class", iters.get(0).getIteratorClass()); + assertEquals(1, iters.get(0).getPriority()); + Map<String,String> opts = iters.get(0).getOptions(); assertEquals(1, opts.size()); - assertEquals(opts.get(0).getKey(), key); - assertEquals(opts.get(0).getValue(), value); + assertTrue(opts.containsKey(key)); + assertEquals(value, opts.get(key)); someSetting.addOption(key + "2", value); someSetting.setPriority(2); someSetting.setName("it2"); - AccumuloInputFormat.addIterator(job.getConf(), someSetting); - opts = AccumuloInputFormat.getIteratorOptions(job.getConf()); - assertEquals(3, opts.size()); - for (AccumuloIteratorOption opt : opts) { - assertEquals(opt.getKey().substring(0, key.length()), key); - assertEquals(opt.getValue(), value); - } + AccumuloInputFormat.addIterator(job, someSetting); + + context = MapreduceWrapper.wrappedTaskAttemptContext(bspJob); + iters = AccumuloInputFormat.getIterators(context); + assertEquals(2, iters.size()); + assertEquals("iterator", iters.get(0).getName()); + assertEquals("Iterator.class", iters.get(0).getIteratorClass()); + assertEquals(1, iters.get(0).getPriority()); + opts = iters.get(0).getOptions(); + assertEquals(1, opts.size()); + assertTrue(opts.containsKey(key)); + assertEquals(value, opts.get(key)); + assertEquals("it2", iters.get(1).getName()); + assertEquals("Iterator.class", iters.get(1).getIteratorClass()); + assertEquals(2, iters.get(1).getPriority()); + opts = iters.get(1).getOptions(); + assertEquals(2, opts.size()); + assertTrue(opts.containsKey(key)); + assertEquals(value, opts.get(key)); + assertTrue(opts.containsKey(key + "2")); + assertEquals(value, opts.get(key + "2")); } @Test public void testGetIteratorSettings() throws IOException { - BSPJob job = new BSPJob(); + BSPJob bspJob = new BSPJob(); + Job job = MapreduceWrapper.wrappedJob(bspJob); - AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); - AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); - AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator")); + AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); + AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); + AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator")); - List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job.getConf()); + TaskAttemptContext context = MapreduceWrapper.wrappedTaskAttemptContext(bspJob); + List<IteratorSetting> list = AccumuloInputFormat.getIterators(context); // Check the list size - assertTrue(list.size() == 3); + assertEquals(3, list.size()); // Walk the list and make sure our settings are correct - AccumuloIterator setting = list.get(0); + IteratorSetting setting = list.get(0); assertEquals(1, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass()); - assertEquals("WholeRow", setting.getIteratorName()); + assertEquals("WholeRow", setting.getName()); setting = list.get(1); assertEquals(2, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); - assertEquals("Versions", setting.getIteratorName()); + assertEquals("Versions", setting.getName()); setting = list.get(2); assertEquals(3, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); - assertEquals("Count", setting.getIteratorName()); - + assertEquals("Count", setting.getName()); } @Test public void testSetRegex() throws IOException { - BSPJob job = new BSPJob(); + BSPJob bspJob = new BSPJob(); + Job job = MapreduceWrapper.wrappedJob(bspJob); String regex = ">\"*%<>\'\\"; IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class); RegExFilter.setRegexs(is, regex, null, null, null, false); - AccumuloInputFormat.addIterator(job.getConf(), is); + AccumuloInputFormat.addIterator(job, is); - assertTrue(regex.equals(AccumuloInputFormat.getIterators(job.getConf()).get(0).getIteratorName())); + TaskAttemptContext context = MapreduceWrapper.wrappedTaskAttemptContext(bspJob); + assertEquals(regex, AccumuloInputFormat.getIterators(context).get(0).getName()); } - static class TestBSP extends BSP<Key,Value,Key,Value> { - Key key = null; - int count = 0; - - @Override - public void bsp(BSPPeer<Key,Value,Key,Value> peer) throws IOException, SyncException, InterruptedException { - // this method reads the next key value record from file - KeyValuePair<Key,Value> pair; - - while ((pair = peer.readNext()) != null) { - if (key != null) { - assertEquals(key.getRow().toString(), new String(pair.getValue().get())); - } - - assertEquals(pair.getKey().getRow(), new Text(String.format("%09x", count + 1))); - assertEquals(new String(pair.getValue().get()), String.format("%09x", count)); - count++; - - key = new Key(pair.getKey()); - } - - peer.sync(); - assertEquals(100, count); - } - } - - @Test - public void testBsp() throws Exception { - MockInstance mockInstance = new MockInstance("testmapinstance"); - Connector c = mockInstance.getConnector("root", new byte[] {}); - if (c.tableOperations().exists("testtable")) - c.tableOperations().delete("testtable"); - c.tableOperations().create("testtable"); - - BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4); - for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); - } - bw.close(); - - BSPJob job = new BSPJob(new HamaConfiguration()); - job.setInputFormat(AccumuloInputFormat.class); - job.setBspClass(TestBSP.class); - job.setInputPath(new Path("test")); - AccumuloInputFormat.setInputInfo(job.getConf(), "root", "".getBytes(), "testtable", new Authorizations()); - AccumuloInputFormat.setMockInstance(job.getConf(), "testmapinstance"); - - AccumuloInputFormat input = new AccumuloInputFormat(); - InputSplit[] splits = input.getSplits(job, 0); - assertEquals(splits.length, 1); - - job.waitForCompletion(false); - } } Copied: accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java (from r1431766, accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java) URL: http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java?p2=accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java&p1=accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java&r1=1431766&r2=1433751&rev=1433751&view=diff ============================================================================== --- accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java (original) +++ accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java Tue Jan 15 23:58:38 2013 @@ -19,6 +19,7 @@ package org.apache.accumulo.core.client. import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.Iterator; @@ -26,7 +27,9 @@ import java.util.Map.Entry; import org.apache.accumulo.bsp.AccumuloInputFormat; import org.apache.accumulo.bsp.AccumuloOutputFormat; +import org.apache.accumulo.bsp.MapreduceWrapper; import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mock.MockInstance; @@ -37,6 +40,8 @@ import org.apache.accumulo.core.security import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPJob; @@ -46,14 +51,17 @@ import org.apache.hama.bsp.sync.SyncExce import org.apache.hama.util.KeyValuePair; import org.junit.Test; -public class AccumuloOutputFormatTest { +/** + * + */ +public class AccumuloOutputFormatIT { - static class TestBSP extends BSP<Key,Value,Text,Mutation> { + static class OutputFormatTestBSP<M extends Writable> extends BSP<Key,Value,Text,Mutation,M> { Key key = null; int count = 0; @Override - public void bsp(BSPPeer<Key,Value,Text,Mutation> peer) throws IOException, SyncException, InterruptedException { + public void bsp(BSPPeer<Key,Value,Text,Mutation,M> peer) throws IOException, SyncException, InterruptedException { // this method reads the next key value record from file KeyValuePair<Key,Value> pair; @@ -73,7 +81,7 @@ public class AccumuloOutputFormatTest { } @Override - public void cleanup(BSPPeer<Key,Value,Text,Mutation> peer) throws IOException { + public void cleanup(BSPPeer<Key,Value,Text,Mutation,M> peer) throws IOException { Mutation m = new Mutation("total"); m.put("", "", Integer.toString(count)); peer.write(new Text("testtable2"), m); @@ -81,7 +89,7 @@ public class AccumuloOutputFormatTest { } @Test - public void testBSP() throws Exception { + public void testBSPOutputFormat() throws Exception { MockInstance mockInstance = new MockInstance("testmrinstance"); Connector c = mockInstance.getConnector("root", new byte[] {}); if (c.tableOperations().exists("testtable1")) @@ -91,7 +99,7 @@ public class AccumuloOutputFormatTest { c.tableOperations().create("testtable1"); c.tableOperations().create("testtable2"); - BatchWriter bw = c.createBatchWriter("testtable1", 10000L, 1000L, 4); + BatchWriter bw = c.createBatchWriter("testtable1", new BatchWriterConfig()); for (int i = 0; i < 100; i++) { Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); @@ -100,29 +108,33 @@ public class AccumuloOutputFormatTest { bw.close(); Configuration conf = new Configuration(); - BSPJob bsp = new BSPJob(new HamaConfiguration(conf)); - bsp.setJobName("Test Input Output"); + BSPJob bspJob = new BSPJob(new HamaConfiguration(conf)); + bspJob.setJobName("Test Input Output"); - bsp.setBspClass(TestBSP.class); - bsp.setInputFormat(AccumuloInputFormat.class); - bsp.setInputPath(new Path("test")); - - bsp.setOutputFormat(AccumuloOutputFormat.class); - bsp.setOutputPath(new Path("test")); - - bsp.setOutputKeyClass(Text.class); - bsp.setOutputValueClass(Mutation.class); - - AccumuloInputFormat.setInputInfo(bsp.getConf(), "root", "".getBytes(), "testtable1", new Authorizations()); - AccumuloInputFormat.setMockInstance(bsp.getConf(), "testmrinstance"); - AccumuloOutputFormat.setOutputInfo(bsp.getConf(), "root", "".getBytes(), false, "testtable2"); - AccumuloOutputFormat.setMockInstance(bsp.getConf(), "testmrinstance"); + bspJob.setBspClass(OutputFormatTestBSP.class); + bspJob.setInputFormat(AccumuloInputFormat.class); + bspJob.setInputPath(new Path("test")); + + bspJob.setOutputFormat(AccumuloOutputFormat.class); + bspJob.setJar("target/integration-tests.jar"); + bspJob.setOutputPath(new Path("target/bsp-outputformat-test")); + + bspJob.setOutputKeyClass(Text.class); + bspJob.setOutputValueClass(Mutation.class); + + Job job = MapreduceWrapper.wrappedJob(bspJob); + + AccumuloInputFormat.setInputInfo(job, "root", "".getBytes(), "testtable1", new Authorizations()); + AccumuloInputFormat.setMockInstance(job, "testmrinstance"); + AccumuloOutputFormat.setOutputInfo(job, "root", "".getBytes(), false, "testtable2"); + AccumuloOutputFormat.setMockInstance(job, "testmrinstance"); AccumuloInputFormat input = new AccumuloInputFormat(); - InputSplit[] splits = input.getSplits(bsp, 0); + InputSplit[] splits = input.getSplits(bspJob, 0); assertEquals(splits.length, 1); - bsp.waitForCompletion(false); + if (!bspJob.waitForCompletion(false)) + fail("Job not finished successfully"); Scanner scanner = c.createScanner("testtable2", new Authorizations()); Iterator<Entry<Key,Value>> iter = scanner.iterator(); @@ -131,6 +143,6 @@ public class AccumuloOutputFormatTest { assertEquals("total", entry.getKey().getRow().toString()); assertEquals(100, Integer.parseInt(new String(entry.getValue().get()))); assertFalse(iter.hasNext()); - } + }