Repository: crunch
Updated Branches:
  refs/heads/master 41b201a9e -> 8121bdf5a


Expose combine file split file path via Hadoop config

Signed-off-by: Josh Wills <jwi...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8121bdf5
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8121bdf5
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8121bdf5

Branch: refs/heads/master
Commit: 8121bdf5a8a292b796fb3dc07f14e96a8f06d5a7
Parents: 41b201a
Author: Ben Roling <ben.rol...@cerner.com>
Authored: Wed Jan 24 10:40:18 2018 -0600
Committer: Josh Wills <jwi...@apache.org>
Committed: Fri Feb 2 15:10:18 2018 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/io/CombineFileIT.java     | 54 +++++++++++++++++++-
 .../crunch/impl/mr/run/CrunchRecordReader.java  | 10 +++-
 2 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/8121bdf5/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java 
b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
index d0d61f9..4c8189f 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
@@ -18,18 +18,26 @@
 package org.apache.crunch.io;
 
 import com.google.common.io.Files;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.test.Tests;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class CombineFileIT {
   @Rule
@@ -37,7 +45,7 @@ public class CombineFileIT {
 
   @Test
   public void testCombine() throws Exception {
-    File srcFiles = tmpDir.getFile("srcs");
+    final File srcFiles = tmpDir.getFile("srcs");
     File outputFiles = tmpDir.getFile("out");
     assertTrue(srcFiles.mkdir());
     File src1 = tmpDir.copyResourceFile(Tests.resource(this, "src1.txt"));
@@ -47,9 +55,51 @@ public class CombineFileIT {
 
     MRPipeline p = new MRPipeline(CombineFileIT.class, 
tmpDir.getDefaultConfiguration());
     PCollection<String> in = p.readTextFile(srcFiles.getAbsolutePath());
-    in.write(To.textFile(outputFiles.getAbsolutePath()));
+    PCollection<Pair<String, String>> out = in.parallelDo(
+            new IdentityPlusPathFn(srcFiles), Avros.pairs(Avros.strings(), 
Avros.strings()));
+    out.write(To.textFile(outputFiles.getAbsolutePath()));
     p.done();
     assertEquals(4, outputFiles.listFiles().length);
+
+    // verify "crunch.split.file" is being handled correctly
+    FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
+    Path qualifiedSourcePath = fs.makeQualified(new 
Path(srcFiles.getAbsolutePath()));
+    Iterable<Pair<String, String>> materialized = out.materialize();
+    for (Pair<String, String> pair : materialized) {
+      Path path = new Path(pair.first());
+      String text = pair.second();
+      assertEquals(qualifiedSourcePath, path.getParent());
+      String fileName = path.getName();
+
+      // make sure filename is correct for each record
+      String[] parts = text.split(",");
+      switch (fileName) {
+        case "src1.txt":
+          assertEquals("1", parts[1].substring(0, 1));
+          break;
+        case "src2.txt":
+          assertEquals("2", parts[1].substring(0, 1));
+          break;
+        default:
+          fail("unexpected filename: " + fileName);
+      }
+    }
+
   }
 
+  private static class IdentityPlusPathFn extends DoFn<String, Pair<String, 
String>> {
+    private final File srcFiles;
+
+    public IdentityPlusPathFn(File srcFiles) {
+      this.srcFiles = srcFiles;
+    }
+
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter) {
+      String filePath = getConfiguration().get("crunch.split.file");
+      assertNotNull(filePath);
+
+      emitter.emit(Pair.of(filePath, input));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8121bdf5/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git 
a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
 
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
index d4175a6..da4bc33 100644
--- 
a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ 
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -37,6 +37,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
   private TaskAttemptContext context;
   private int idx;
   private long progress;
+  private Configuration rootConf;
 
   public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext 
context) throws IOException,
       InterruptedException {
@@ -44,6 +45,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
     if (crunchSplit.get() instanceof CombineFileSplit) {
       combineFileSplit = (CombineFileSplit) crunchSplit.get();
     }
+    rootConf = context.getConfiguration();
     crunchSplit.setConf(context.getConfiguration());
     this.context = new TaskAttemptContextImpl(crunchSplit.getConf(), 
context.getTaskAttemptID());
     initNextRecordReader();
@@ -70,7 +72,13 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
     InputFormat<K, V> inputFormat = (InputFormat<K, V>) 
ReflectionUtils.newInstance(
         crunchSplit.getInputFormatClass(),
         crunchSplit.getConf());
-    this.curReader = inputFormat.createRecordReader(getDelegateSplit(), 
context);
+
+    InputSplit inputSplit = getDelegateSplit();
+    if (inputSplit instanceof FileSplit)
+    {
+      rootConf.set("crunch.split.file", ((FileSplit) 
inputSplit).getPath().toString());
+    }
+    this.curReader = inputFormat.createRecordReader(inputSplit, context);
     return true;
   }
 

Reply via email to