Repository: crunch Updated Branches: refs/heads/master 41b201a9e -> 8121bdf5a
Expose combine file split file path via Hadoop config Signed-off-by: Josh Wills <jwi...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8121bdf5 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8121bdf5 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8121bdf5 Branch: refs/heads/master Commit: 8121bdf5a8a292b796fb3dc07f14e96a8f06d5a7 Parents: 41b201a Author: Ben Roling <ben.rol...@cerner.com> Authored: Wed Jan 24 10:40:18 2018 -0600 Committer: Josh Wills <jwi...@apache.org> Committed: Fri Feb 2 15:10:18 2018 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/io/CombineFileIT.java | 54 +++++++++++++++++++- .../crunch/impl/mr/run/CrunchRecordReader.java | 10 +++- 2 files changed, 61 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8121bdf5/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java index d0d61f9..4c8189f 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java @@ -18,18 +18,26 @@ package org.apache.crunch.io; import com.google.common.io.Files; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.test.Tests; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.junit.Rule; import org.junit.Test; import java.io.File; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class CombineFileIT { @Rule @@ -37,7 +45,7 @@ public class CombineFileIT { @Test public void testCombine() throws Exception { - File srcFiles = tmpDir.getFile("srcs"); + final File srcFiles = tmpDir.getFile("srcs"); File outputFiles = tmpDir.getFile("out"); assertTrue(srcFiles.mkdir()); File src1 = tmpDir.copyResourceFile(Tests.resource(this, "src1.txt")); @@ -47,9 +55,51 @@ public class CombineFileIT { MRPipeline p = new MRPipeline(CombineFileIT.class, tmpDir.getDefaultConfiguration()); PCollection<String> in = p.readTextFile(srcFiles.getAbsolutePath()); - in.write(To.textFile(outputFiles.getAbsolutePath())); + PCollection<Pair<String, String>> out = in.parallelDo( + new IdentityPlusPathFn(srcFiles), Avros.pairs(Avros.strings(), Avros.strings())); + out.write(To.textFile(outputFiles.getAbsolutePath())); p.done(); assertEquals(4, outputFiles.listFiles().length); + + // verify "crunch.split.file" is being handled correctly + FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration()); + Path qualifiedSourcePath = fs.makeQualified(new Path(srcFiles.getAbsolutePath())); + Iterable<Pair<String, String>> materialized = out.materialize(); + for (Pair<String, String> pair : materialized) { + Path path = new Path(pair.first()); + String text = pair.second(); + assertEquals(qualifiedSourcePath, path.getParent()); + String fileName = path.getName(); + + // make sure filename is correct for each record + String[] parts = text.split(","); + switch (fileName) { + case "src1.txt": + assertEquals("1", parts[1].substring(0, 1)); + break; + case "src2.txt": + assertEquals("2", parts[1].substring(0, 1)); + break; + default: + fail("unexpected filename: " + fileName); + } + } + } + private static class IdentityPlusPathFn extends DoFn<String, Pair<String, String>> { + private final File srcFiles; + + public IdentityPlusPathFn(File srcFiles) { + this.srcFiles = srcFiles; + } + + @Override + public void process(String input, Emitter<Pair<String, String>> emitter) { + String filePath = getConfiguration().get("crunch.split.file"); + assertNotNull(filePath); + + emitter.emit(Pair.of(filePath, input)); + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8121bdf5/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java index d4175a6..da4bc33 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java @@ -37,6 +37,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { private TaskAttemptContext context; private int idx; private long progress; + private Configuration rootConf; public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException, InterruptedException { @@ -44,6 +45,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { if (crunchSplit.get() instanceof CombineFileSplit) { combineFileSplit = (CombineFileSplit) crunchSplit.get(); } + rootConf = context.getConfiguration(); crunchSplit.setConf(context.getConfiguration()); this.context = new TaskAttemptContextImpl(crunchSplit.getConf(), context.getTaskAttemptID()); initNextRecordReader(); @@ -70,7 +72,13 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance( crunchSplit.getInputFormatClass(), crunchSplit.getConf()); - this.curReader = inputFormat.createRecordReader(getDelegateSplit(), context); + + InputSplit inputSplit = getDelegateSplit(); + if (inputSplit instanceof FileSplit) + { + rootConf.set("crunch.split.file", ((FileSplit) inputSplit).getPath().toString()); + } + this.curReader = inputFormat.createRecordReader(inputSplit, context); return true; }