Author: kturner Date: Mon Oct 22 18:08:17 2012 New Revision: 1400976 URL: http://svn.apache.org/viewvc?rev=1400976&view=rev Log: ACCUMULO-826 ACCUMULO-507 reverted revisions 1397700,1382923,1339308,1339223,1336322. These changes caused map reduce jobs to fail if the process that started the job exited.
Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1400976&r1=1400975&r2=1400976&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java (original) +++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java Mon Oct 22 18:08:17 2012 @@ -38,13 +38,6 @@ import org.apache.accumulo.core.security import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -75,7 +68,7 @@ public class AccumuloOutputFormat extend private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured"; private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; private static final String USERNAME = PREFIX + ".username"; - private static final String PASSWORD_PATH = PREFIX + ".password"; + private static final String PASSWORD = PREFIX + ".password"; private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable"; private static final String INSTANCE_NAME = PREFIX + ".instanceName"; @@ -135,28 +128,10 @@ public class AccumuloOutputFormat extend ArgumentChecker.notNull(user, passwd); conf.set(USERNAME, user); + conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); conf.setBoolean(CREATETABLES, createTables); if (defaultTable != null) conf.set(DEFAULT_TABLE_NAME, defaultTable); - - try { - FileSystem fs = FileSystem.get(conf); - Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".pw"); - conf.set(PASSWORD_PATH, file.toString()); - FSDataOutputStream fos = fs.create(file, false); - fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); - fs.deleteOnExit(file); - - byte[] encodedPw = Base64.encodeBase64(passwd); - fos.writeInt(encodedPw.length); - fos.write(encodedPw); - fos.close(); - - DistributedCache.addCacheFile(file.toUri(), conf); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } /** @@ -257,28 +232,21 @@ public class AccumuloOutputFormat extend } /** - * @throws IOException + * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a + * string, and is not intended to be secure. * * @deprecated Use {@link #getPassword(Configuration)} instead */ - protected static byte[] getPassword(JobContext job) throws IOException { + protected static byte[] getPassword(JobContext job) { return getPassword(job.getConfiguration()); } /** - * @throws IOException + * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a + * string, and is not intended to be secure. */ - protected static byte[] getPassword(Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path file = new Path(conf.get(PASSWORD_PATH)); - - FSDataInputStream fdis = fs.open(file); - int length = fdis.readInt(); - byte[] encodedPassword = new byte[length]; - fdis.read(encodedPassword); - fdis.close(); - - return Base64.decodeBase64(encodedPassword); + protected static byte[] getPassword(Configuration conf) { + return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()); } /** @@ -386,7 +354,7 @@ public class AccumuloOutputFormat extend private Connector conn; - AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException, IOException { + AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException { Level l = getLogLevel(attempt); if (l != null) log.setLevel(getLogLevel(attempt)); Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1400976&r1=1400975&r2=1400976&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original) +++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Mon Oct 22 18:08:17 2012 @@ -16,8 +16,12 @@ */ package org.apache.accumulo.core.client.mapreduce; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.InvocationTargetException; @@ -77,13 +81,6 @@ import org.apache.accumulo.core.util.Tex import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; @@ -118,7 +115,7 @@ public abstract class InputFormatBase<K, private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured"; private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; private static final String USERNAME = PREFIX + ".username"; - private static final String PASSWORD_PATH = PREFIX + ".password"; + private static final String PASSWORD = PREFIX + ".password"; private static final String TABLE_NAME = PREFIX + ".tablename"; private static final String AUTHORIZATIONS = PREFIX + ".authorizations"; @@ -217,28 +214,10 @@ public abstract class InputFormatBase<K, ArgumentChecker.notNull(user, passwd, table); conf.set(USERNAME, user); + conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); conf.set(TABLE_NAME, table); if (auths != null && !auths.isEmpty()) conf.set(AUTHORIZATIONS, auths.serialize()); - - try { - FileSystem fs = FileSystem.get(conf); - Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".pw"); - conf.set(PASSWORD_PATH, file.toString()); - FSDataOutputStream fos = fs.create(file, false); - fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); - fs.deleteOnExit(file); - - byte[] encodedPw = Base64.encodeBase64(passwd); - fos.writeInt(encodedPw.length); - fos.write(encodedPw); - fos.close(); - - DistributedCache.addCacheFile(file.toUri(), conf); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } /** @@ -306,24 +285,17 @@ public abstract class InputFormatBase<K, */ public static void setRanges(Configuration conf, Collection<Range> ranges) { ArgumentChecker.notNull(ranges); + ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size()); try { - FileSystem fs = FileSystem.get(conf); - Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".ranges"); - conf.set(RANGES, file.toString()); - FSDataOutputStream fos = fs.create(file, false); - fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); - fs.deleteOnExit(file); - - fos.writeInt(ranges.size()); for (Range r : ranges) { - r.write(fos); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + r.write(new DataOutputStream(baos)); + rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()))); } - fos.close(); - - DistributedCache.addCacheFile(file.toUri(), conf); - } catch (IOException e) { - throw new RuntimeException("Unable to write ranges to file", e); + } catch (IOException ex) { + throw new IllegalArgumentException("Unable to encode ranges to Base64", ex); } + conf.setStrings(RANGES, rangeStrings.toArray(new String[0])); } /** @@ -660,31 +632,26 @@ public abstract class InputFormatBase<K, } /** - * @throws IOException + * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a + * string, and is not intended to be secure. + * * @deprecated Use {@link #getPassword(Configuration)} instead */ - protected static byte[] getPassword(JobContext job) throws IOException { + protected static byte[] getPassword(JobContext job) { return getPassword(job.getConfiguration()); } /** + * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to + * provide a charset safe conversion to a string, and is not intended to be secure. + * * @param conf * the Hadoop configuration object * @return the BASE64-encoded password - * @throws IOException * @see #setInputInfo(Configuration, String, byte[], String, Authorizations) */ - protected static byte[] getPassword(Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path file = new Path(conf.get(PASSWORD_PATH)); - - FSDataInputStream fdis = fs.open(file); - int length = fdis.readInt(); - byte[] encodedPassword = new byte[length]; - fdis.read(encodedPassword); - fdis.close(); - - return Base64.decodeBase64(encodedPassword); + protected static byte[] getPassword(Configuration conf) { + return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()); } /** @@ -751,7 +718,7 @@ public abstract class InputFormatBase<K, /** * @deprecated Use {@link #getTabletLocator(Configuration)} instead */ - protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException, IOException { + protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException { return getTabletLocator(job.getConfiguration()); } @@ -763,10 +730,8 @@ public abstract class InputFormatBase<K, * @return an accumulo tablet locator * @throws TableNotFoundException * if the table name set on the configuration doesn't exist - * @throws IOException - * if the input format is unable to read the password file from the FileSystem */ - protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { + protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException { if (conf.getBoolean(MOCK, false)) return new MockTabletLocator(); Instance instance = getInstance(conf); @@ -796,21 +761,12 @@ public abstract class InputFormatBase<K, */ protected static List<Range> getRanges(Configuration conf) throws IOException { ArrayList<Range> ranges = new ArrayList<Range>(); - FileSystem fs = FileSystem.get(conf); - String rangePath = conf.get(RANGES); - if (rangePath == null) - return ranges; - Path file = new Path(rangePath); - - FSDataInputStream fdis = fs.open(file); - int numRanges = fdis.readInt(); - while (numRanges > 0) { - Range r = new Range(); - r.readFields(fdis); - ranges.add(r); - numRanges--; + for (String rangeString : conf.getStringCollection(RANGES)) { + ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes())); + Range range = new Range(); + range.readFields(new DataInputStream(bais)); + ranges.add(range); } - fdis.close(); return ranges; } @@ -1227,7 +1183,7 @@ public abstract class InputFormatBase<K, } Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext job, String tableName, List<Range> ranges) throws TableNotFoundException, - AccumuloException, AccumuloSecurityException, IOException { + AccumuloException, AccumuloSecurityException { Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); Modified: accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1400976&r1=1400975&r2=1400976&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java (original) +++ accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Mon Oct 22 18:08:17 2012 @@ -20,8 +20,6 @@ import static org.junit.Assert.assertEqu import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import org.apache.accumulo.core.client.BatchWriter; @@ -34,7 +32,6 @@ import org.apache.accumulo.core.client.m import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.security.Authorizations; @@ -104,24 +101,6 @@ public class AccumuloInputFormatTest { String iterators = conf.get("AccumuloInputFormat.iterators"); assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators); } - - static abstract class GetRanges<K, V> extends InputFormatBase<K,V> { - public static List<Range> getRanges(Configuration conf) throws IOException { - return InputFormatBase.getRanges(conf); - } - }; - - @Test - public void testSetRanges() throws IOException { - JobContext job = new JobContext(new Configuration(), new JobID()); - List<Range> ranges = new ArrayList<Range>(); - for (int i = 0; i < 100000; i++) { - ranges.add(new Range(new Text(String.format("%05x", i)))); - } - AccumuloInputFormat.setRanges(job.getConfiguration(), ranges); - List<Range> ranges2 = GetRanges.getRanges(job.getConfiguration()); - assertEquals(ranges, ranges2); - } @Test public void testAddIterator() { @@ -298,17 +277,14 @@ public class AccumuloInputFormatTest { static class TestMapper extends Mapper<Key,Value,Key,Value> { Key key = null; - int first = 0; int count = 0; @Override protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { if (key != null) assertEquals(key.getRow().toString(), new String(v.get())); - else - first = Integer.parseInt(k.getRow().toString(), 16) - 1; - assertEquals(k.getRow(), new Text(String.format("%09x", first + count + 1))); - assertEquals(new String(v.get()), String.format("%09x", first + count)); + assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); + assertEquals(new String(v.get()), String.format("%09x", count)); key = new Key(k); count++; } @@ -333,14 +309,10 @@ public class AccumuloInputFormatTest { job.setNumReduceTasks(0); AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations()); AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); - HashSet<Range> ranges = new HashSet<Range>(); - ranges.add(new Range("000000000", "000000010")); - ranges.add(new Range("000000100", "000000110")); - AccumuloInputFormat.setRanges(job.getConfiguration(), ranges); AccumuloInputFormat input = new AccumuloInputFormat(); List<InputSplit> splits = input.getSplits(job); - assertEquals(splits.size(), 2); + assertEquals(splits.size(), 1); TestMapper mapper = (TestMapper) job.getMapperClass().newInstance(); for (InputSplit split : splits) { @@ -350,7 +322,6 @@ public class AccumuloInputFormatTest { Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split); reader.initialize(split, context); mapper.run(context); - assertEquals(mapper.count, 16); } } @@ -371,9 +342,7 @@ public class AccumuloInputFormatTest { AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations()); AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance"); AccumuloInputFormat input = new AccumuloInputFormat(); - List<InputSplit> splits = input.getSplits(job); - assertEquals(splits.size(), 1); - RangeInputSplit ris = (RangeInputSplit) splits.get(0); + RangeInputSplit ris = new RangeInputSplit(); TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); RecordReader<Key,Value> rr = input.createRecordReader(ris, tac); rr.initialize(ris, tac); @@ -383,6 +352,5 @@ public class AccumuloInputFormatTest { while (rr.nextKeyValue()) { mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context); } - assertEquals(mapper.count, 100); } }