Repository: tez Updated Branches: refs/heads/master 96c988cff -> a1f2da8eb
TEZ-3894. Tez intermediate outputs implicitly rely on permissive umask for shuffle (Jason Lowe via kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a1f2da8e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a1f2da8e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a1f2da8e Branch: refs/heads/master Commit: a1f2da8eb319218ff2a6dbe0f6de911336ac7e45 Parents: 96c988c Author: Kuhu Shukla <kshu...@yahoo-inc.com> Authored: Fri Feb 9 13:54:11 2018 -0600 Committer: Kuhu Shukla <kshu...@yahoo-inc.com> Committed: Fri Feb 9 13:54:11 2018 -0600 ---------------------------------------------------------------------- .../common/sort/impl/PipelinedSorter.java | 12 +++++ .../common/sort/impl/TezSpillRecord.java | 5 ++ .../common/sort/impl/dflt/DefaultSorter.java | 12 +++++ .../writers/UnorderedPartitionedKVWriter.java | 12 +++++ .../common/sort/impl/TestPipelinedSorter.java | 57 +++++++++++--------- .../sort/impl/dflt/TestDefaultSorter.java | 25 ++++++++- .../TestUnorderedPartitionedKVWriter.java | 24 ++++++++- 7 files changed, 119 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index c4782f6..7915662 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -38,6 +38,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; @@ -68,6 +69,8 @@ import org.apache.tez.util.StopWatch; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS; + @SuppressWarnings({"unchecked", "rawtypes"}) public class PipelinedSorter extends ExternalSorter { @@ -479,6 +482,9 @@ public class PipelinedSorter extends ExternalSorter { * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillFilePaths.put(numSpills, filename); FSDataOutputStream out = rfs.create(filename, true, 4096); + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) { + rfs.setPermission(filename, SPILL_FILE_PERMS); + } try { LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() + @@ -564,6 +570,9 @@ public class PipelinedSorter extends ExternalSorter { mapOutputFile.getSpillFileForWrite(numSpills, size); spillFilePaths.put(numSpills, filename); out = rfs.create(filename, true, 4096); + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) { + rfs.setPermission(filename, SPILL_FILE_PERMS); + } LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString()); for (int i = 0; i < partitions; ++i) { if (isThreadInterrupted()) { @@ -749,6 +758,9 @@ public class PipelinedSorter extends ExternalSorter { } //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) { + rfs.setPermission(finalOutputFile, SPILL_FILE_PERMS); + } final TezSpillRecord spillRec = new TezSpillRecord(partitions); http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java index ab4142b..48bd211 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java @@ -30,11 +30,13 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.PureJavaCrc32; import org.apache.tez.runtime.library.common.Constants; public class TezSpillRecord { + public static final FsPermission SPILL_FILE_PERMS = new FsPermission((short) 0640); /** Backing store */ private final ByteBuffer buf; @@ -140,6 +142,9 @@ public class TezSpillRecord { } else { out.close(); } + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(job)))) { + rfs.setPermission(loc, SPILL_FILE_PERMS); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 85e0003..cfcbd56 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -32,6 +32,7 @@ import java.util.zip.Deflater; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; @@ -63,6 +64,8 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; import com.google.common.base.Preconditions; +import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS; + @SuppressWarnings({"unchecked", "rawtypes"}) public final class DefaultSorter extends ExternalSorter implements IndexedSortable { @@ -893,6 +896,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab mapOutputFile.getSpillFileForWrite(numSpills, size); spillFilePaths.put(numSpills, filename); out = rfs.create(filename); + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) { + rfs.setPermission(filename, SPILL_FILE_PERMS); + } int spindex = mstart; final InMemValBytes value = createInMemValBytes(); @@ -1000,6 +1006,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab mapOutputFile.getSpillFileForWrite(numSpills, size); spillFilePaths.put(numSpills, filename); out = rfs.create(filename); + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) { + rfs.setPermission(filename, SPILL_FILE_PERMS); + } // we don't run the combiner for a single record for (int i = 0; i < partitions; ++i) { @@ -1273,6 +1282,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) { + rfs.setPermission(finalOutputFile, SPILL_FILE_PERMS); + } if (numSpills == 0) { // TODO Change event generation to say there is no data rather than generating a dummy file http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index f4ebc97..b9f0edf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.tez.common.CallableWithNdc; @@ -83,6 +84,8 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; +import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS; + public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWriter { private static final Logger LOG = LoggerFactory.getLogger(UnorderedPartitionedKVWriter.class); @@ -588,6 +591,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit this.spillIndex = spillPathDetails.spillIndex; } FSDataOutputStream out = rfs.create(spillPathDetails.outputFilePath); + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) { + rfs.setPermission(spillPathDetails.outputFilePath, SPILL_FILE_PERMS); + } TezSpillRecord spillRecord = new TezSpillRecord(numPartitions); DataInputBuffer key = new DataInputBuffer(); DataInputBuffer val = new DataInputBuffer(); @@ -984,6 +990,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit FSDataOutputStream out = null; try { out = rfs.create(finalOutPath); + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) { + rfs.setPermission(finalOutPath, SPILL_FILE_PERMS); + } Writer writer = null; for (int i = 0; i < numPartitions; i++) { @@ -1072,6 +1081,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions); final Path outPath = spillPathDetails.outputFilePath; out = rfs.create(outPath); + if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) { + rfs.setPermission(outPath, SPILL_FILE_PERMS); + } BitSet emptyPartitions = null; if (pipelinedShuffle || !isFinalMergeEnabled) { emptyPartitions = new BitSet(numPartitions); http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index d6f6273..727f8ac 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -21,7 +21,9 @@ package org.apache.tez.runtime.library.common.sort.impl; import com.google.common.collect.Maps; import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -43,6 +45,7 @@ import org.apache.tez.runtime.api.TaskContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; +import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.combine.Combiner; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; @@ -70,8 +73,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; public class TestPipelinedSorter { + private static Configuration conf; private static FileSystem localFs = null; private static Path workDir = null; + private static LocalDirAllocator dirAllocator; private OutputContext outputContext; private int numOutputs; @@ -81,13 +86,14 @@ public class TestPipelinedSorter { private static TreeMap<String, String> sortedDataMap = Maps.newTreeMap(); static { - Configuration conf = getConf(); + conf = getConf(); try { localFs = FileSystem.getLocal(conf); workDir = new Path( new Path(System.getProperty("test.build.data", "/tmp")), TestPipelinedSorter.class.getName()) .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + dirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); } catch (IOException e) { throw new RuntimeException(e); } @@ -100,10 +106,11 @@ public class TestPipelinedSorter { @Before public void setup() throws IOException { + conf = getConf(); ApplicationId appId = ApplicationId.newInstance(10000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - String auxiliaryService = getConf().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); } @@ -111,6 +118,7 @@ public class TestPipelinedSorter { public static Configuration getConf() { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); //To enable PipelinedSorter conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name()); @@ -139,15 +147,13 @@ public class TestPipelinedSorter { //TODO: need to support multiple partition testing later //# partition, # of keys, size per key, InitialMem, blockSize - Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20); - + verifyOutputPermissions(outputContext.getUniqueIdentifier()); } @Test public void testWithoutPartitionStats() throws IOException { - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, false); //# partition, # of keys, size per key, InitialMem, blockSize basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20); @@ -156,7 +162,6 @@ public class TestPipelinedSorter { @Test public void testWithEmptyData() throws IOException { - Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); //# partition, # of keys, size per key, InitialMem, blockSize basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20); @@ -166,7 +171,6 @@ public class TestPipelinedSorter { public void testEmptyDataWithPipelinedShuffle() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 1 *1024 * 1024; - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setInt(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); @@ -207,7 +211,6 @@ public class TestPipelinedSorter { int partitions = 50; this.numOutputs = partitions; this.initialAvailableMem = 1 *1024 * 1024; - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, sendEmptyPartitionDetails); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); conf.setInt(TezRuntimeConfiguration @@ -222,6 +225,7 @@ public class TestPipelinedSorter { assertTrue(sorter.getNumSpills() == numKeys + 1); } verifyCounters(sorter, outputContext); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); Path indexFile = sorter.getFinalIndexFile(); TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf); for (int i = 0; i < partitions; i++) { @@ -264,7 +268,6 @@ public class TestPipelinedSorter { @Test public void testExceedsKVWithMultiplePartitions() throws IOException { - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); this.numOutputs = 5; this.initialAvailableMem = 1 * 1024 * 1024; @@ -275,13 +278,13 @@ public class TestPipelinedSorter { writeData(sorter, 100, 1<<20); verifyCounters(sorter, outputContext); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); } @Test public void testExceedsKVWithPipelinedShuffle() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 1 *1024 * 1024; - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setInt(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); @@ -301,7 +304,6 @@ public class TestPipelinedSorter { public void test_TEZ_2602_50mb() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 1 *1024 * 1024; - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); conf.setInt(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); @@ -318,13 +320,13 @@ public class TestPipelinedSorter { sorter.flush(); sorter.close(); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); } //@Test public void testLargeDataWithMixedKV() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 48 *1024 * 1024; - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, initialAvailableMem); @@ -346,6 +348,7 @@ public class TestPipelinedSorter { sorter.flush(); sorter.close(); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); } @@ -382,7 +385,6 @@ public class TestPipelinedSorter { @Test public void testWithCustomComparator() throws IOException { //Test with custom comparator - Configuration conf = getConf(); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, CustomComparator.class.getName()); basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20); @@ -392,7 +394,6 @@ public class TestPipelinedSorter { public void testWithPipelinedShuffle() throws IOException { this.numOutputs = 1; this.initialAvailableMem = 5 *1024 * 1024; - Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setInt(TezRuntimeConfiguration @@ -411,7 +412,6 @@ public class TestPipelinedSorter { @Test public void testCountersWithMultiplePartitions() throws IOException { - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); this.numOutputs = 5; this.initialAvailableMem = 5 * 1024 * 1024; @@ -422,11 +422,11 @@ public class TestPipelinedSorter { writeData(sorter, 10000, 100); verifyCounters(sorter, outputContext); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); } @Test public void testMultipleSpills() throws IOException { - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); this.numOutputs = 5; this.initialAvailableMem = 5 * 1024 * 1024; @@ -438,11 +438,11 @@ public class TestPipelinedSorter { writeData(sorter, 25000, 1000); assertFalse("Expecting needsRLE to be false", sorter.needsRLE()); verifyCounters(sorter, outputContext); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); } @Test public void testWithCombiner() throws IOException { - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, DummyCombiner.class.getName()); this.numOutputs = 5; @@ -461,6 +461,7 @@ public class TestPipelinedSorter { reader.close(); verifyCounters(sorter, outputContext); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); } // for testWithCombiner @@ -479,7 +480,6 @@ public class TestPipelinedSorter { @Test public void testMultipleSpills_WithRLE() throws IOException { - Configuration conf = getConf(); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); this.numOutputs = 5; this.initialAvailableMem = 5 * 1024 * 1024; @@ -491,12 +491,12 @@ public class TestPipelinedSorter { writeSimilarKeys(sorter, 25000, 1000, true); assertTrue("Expecting needsRLE to be true", sorter.needsRLE()); verifyCounters(sorter, outputContext); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); } public void basicTest2(int partitions, int[] numkeys, int[] keysize, long initialAvailableMem, int blockSize) throws IOException { this.numOutputs = partitions; // single output - Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 100); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, @@ -520,12 +520,12 @@ public class TestPipelinedSorter { } sorter.flush(); sorter.close(); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); } public void basicTest(int partitions, int numKeys, int keySize, long initialAvailableMem, int minBlockSize) throws IOException { this.numOutputs = partitions; // single output - Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, minBlockSize >> 20); PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, @@ -543,6 +543,7 @@ public class TestPipelinedSorter { } verifyCounters(sorter, outputContext); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); Path outputFile = sorter.finalOutputFile; FileSystem fs = outputFile.getFileSystem(conf); TezCounter finalOutputBytes = @@ -596,7 +597,6 @@ public class TestPipelinedSorter { //Its not possible to allocate > 2 GB in test environment. Carry out basic checks here. public void memTest() throws IOException { //Verify if > 2 GB can be set via config - Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076); long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 1024l); Assert.assertTrue(size == (3076l << 20)); @@ -681,7 +681,6 @@ public class TestPipelinedSorter { //Intentionally not having timeout public void test_without_lazyMemAllocation() throws IOException { this.numOutputs = 10; - Configuration conf = getConf(); //128 MB. Pre-allocate. Request for default block size. Get 1 buffer conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); @@ -722,7 +721,6 @@ public class TestPipelinedSorter { //Intentionally not having timeout public void test_with_lazyMemAllocation() throws IOException { this.numOutputs = 10; - Configuration conf = getConf(); //128 MB. Do not pre-allocate. // Get 32 MB buffer first and the another buffer with 96 on filling up @@ -782,7 +780,6 @@ public class TestPipelinedSorter { //Intentionally not having timeout public void testLazyAllocateMem() throws IOException { this.numOutputs = 10; - Configuration conf = getConf(); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); conf.setBoolean(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); @@ -830,6 +827,17 @@ public class TestPipelinedSorter { basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20); } + private void verifyOutputPermissions(String spillId) throws IOException { + String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId + + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING; + Path outputPath = dirAllocator.getLocalPathToRead(subpath, conf); + Path indexPath = dirAllocator.getLocalPathToRead(subpath + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING, conf); + Assert.assertEquals("Incorrect output permissions", (short)0640, + localFs.getFileStatus(outputPath).getPermission().toShort()); + Assert.assertEquals("Incorrect index permissions", (short)0640, + localFs.getFileStatus(indexPath).getPermission().toShort()); + } + private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException { writeData(sorter, numKeys, keyLen, true); } @@ -880,7 +888,6 @@ public class TestPipelinedSorter { Text readValue = new Text(); DataInputBuffer keyIn = new DataInputBuffer(); DataInputBuffer valIn = new DataInputBuffer(); - Configuration conf = getConf(); SerializationFactory serializationFactory = new SerializationFactory(conf); Deserializer<Text> keyDeserializer = serializationFactory.getDeserializer(Text.class); Deserializer<Text> valDeserializer = serializationFactory.getDeserializer(Text.class); http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index 444ebaf..aad232a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -18,6 +18,8 @@ package org.apache.tez.runtime.library.common.sort.impl.dflt; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.tez.runtime.library.common.Constants; import org.junit.Assert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -42,6 +44,7 @@ import com.google.protobuf.ByteString; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; @@ -84,16 +87,19 @@ import org.mockito.stubbing.Answer; public class TestDefaultSorter { - private Configuration conf; private static final int PORT = 80; private static final String UniqueID = "UUID"; private static FileSystem localFs = null; private static Path workingDir = null; + private Configuration conf; + private LocalDirAllocator dirAllocator; + @Before public void setup() throws IOException { conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.LEGACY.name()); // DefaultSorter conf.set("fs.defaultFS", "file:///"); localFs = FileSystem.getLocal(conf); @@ -108,6 +114,7 @@ public class TestDefaultSorter { conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName()); conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs); + dirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); } @AfterClass @@ -272,6 +279,8 @@ public class TestDefaultSorter { } catch(IOException ioe) { fail(ioe.getMessage()); } + + verifyOutputPermissions(context.getUniqueIdentifier()); } @Test(timeout = 30000) @@ -396,6 +405,7 @@ public class TestDefaultSorter { assertTrue(sorter.getNumSpills() == numKeys); } verifyCounters(sorter, context); + verifyOutputPermissions(context.getUniqueIdentifier()); if (sorter.indexCacheList.size() != 0) { for (int i = 0; i < sorter.getNumSpills(); i++) { TezSpillRecord record = sorter.indexCacheList.get(i); @@ -482,6 +492,7 @@ public class TestDefaultSorter { ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads .DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload())); assertTrue(shufflePayload.getPathComponent().equalsIgnoreCase(UniqueID + "_0")); + verifyOutputPermissions(shufflePayload.getPathComponent()); } } @@ -513,6 +524,7 @@ public class TestDefaultSorter { ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads .DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload())); assertTrue(shufflePayload.getPathComponent().equalsIgnoreCase(UniqueID + "_" + spillIndex)); + verifyOutputPermissions(shufflePayload.getPathComponent()); spillIndex++; } } @@ -520,6 +532,17 @@ public class TestDefaultSorter { verifyCounters(sorter, context); } + private void verifyOutputPermissions(String spillId) throws IOException { + String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId + + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING; + Path outputPath = dirAllocator.getLocalPathToRead(subpath, conf); + Path indexPath = dirAllocator.getLocalPathToRead(subpath + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING, conf); + Assert.assertEquals("Incorrect output permissions", (short)0640, + localFs.getFileStatus(outputPath).getPermission().toShort()); + Assert.assertEquals("Incorrect index permissions", (short)0640, + localFs.getFileStatus(indexPath).getPermission().toShort()); + } + private void verifyCounters(DefaultSorter sorter, OutputContext context) { TezCounter numShuffleChunks = context.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT); TezCounter additionalSpills = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT); http://git-wip-us.apache.org/repos/asf/tez/blob/a1f2da8e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index ae396cb..dfd807b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -51,9 +51,11 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.protobuf.ByteString; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.SpillInfo; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; @@ -510,6 +512,10 @@ public class TestUnorderedPartitionedKVWriter { if (numRecordsWritten > 0) { assertTrue(localFs.exists(outputFilePath)); assertTrue(localFs.exists(spillFilePath)); + assertEquals("Incorrect output permissions", (short)0640, + localFs.getFileStatus(outputFilePath).getPermission().toShort()); + assertEquals("Incorrect index permissions", (short)0640, + localFs.getFileStatus(spillFilePath).getPermission().toShort()); } else { return; } @@ -794,8 +800,14 @@ public class TestUnorderedPartitionedKVWriter { if (numRecordsWritten > 0) { int numSpills = kvWriter.numSpills.get(); for (int i = 0; i < numSpills; i++) { - assertTrue(localFs.exists(taskOutput.getSpillFileForWrite(i, 10))); - assertTrue(localFs.exists(taskOutput.getSpillIndexFileForWrite(i, 10))); + Path outputFile = taskOutput.getSpillFileForWrite(i, 10); + Path indexFile = taskOutput.getSpillIndexFileForWrite(i, 10); + assertTrue(localFs.exists(outputFile)); + assertTrue(localFs.exists(indexFile)); + assertEquals("Incorrect output permissions", (short)0640, + localFs.getFileStatus(outputFile).getPermission().toShort()); + assertEquals("Incorrect index permissions", (short)0640, + localFs.getFileStatus(indexFile).getPermission().toShort()); } } else { return; @@ -1042,6 +1054,13 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(2, matcher.groupCount()); assertEquals(uniqueId, matcher.group(1)); assertTrue("spill id should be present in path component", matcher.group(2) != null); + Path outputPath = new Path(outputContext.getWorkDirs()[0], + "output/" + eventProto.getPathComponent() + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING); + Path indexPath = outputPath.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING); + assertEquals("Incorrect output permissions", (short)0640, + localFs.getFileStatus(outputPath).getPermission().toShort()); + assertEquals("Incorrect index permissions", (short)0640, + localFs.getFileStatus(indexPath).getPermission().toShort()); } else { assertEquals(0, eventProto.getSpillId()); if (outputRecordsCounter.getValue() > 0) { @@ -1341,6 +1360,7 @@ public class TestUnorderedPartitionedKVWriter { boolean shouldCompress, int maxSingleBufferSizeBytes, Class<? extends Partitioner> partitionerClass) { Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, outputContext.getWorkDirs()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClass.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valClass.getName());