From 0f5d3d575cd84e37f58244c19dc9b7eedbbbe9cb Mon Sep 17 00:00:00 2001
From: tzolov <christian.tzolov@gmail.com>
Date: Mon, 23 Jun 2014 01:22:07 +0200
Subject: [PATCH] Implement the Converter#applyPTypeTransforms() semantics to
 Crunch-Spark runtime

---
 .../src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java    | 6 ++++--
 .../java/org/apache/crunch/impl/spark/collect/InputCollection.java  | 3 ++-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 22375ee..d2f6969 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CombineFn;
@@ -29,6 +30,7 @@ import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.spark.fn.MapFunction;
 import org.apache.crunch.impl.spark.fn.OutputConverterFunction;
@@ -236,11 +238,11 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
             JavaPairRDD<?, ?> outRDD;
             if (rdd instanceof JavaRDD) {
               outRDD = ((JavaRDD) rdd)
-                  .map(new MapFunction(ptype.getOutputMapFn(), ctxt))
+                  .map(new MapFunction((c.applyPTypeTransforms()?ptype.getOutputMapFn():IdentityFn.getInstance()), ctxt))
                   .map(new OutputConverterFunction(c));
             } else {
               outRDD = ((JavaPairRDD) rdd)
-                  .map(new PairMapFunction(ptype.getOutputMapFn(), ctxt))
+                  .map(new PairMapFunction((c.applyPTypeTransforms()?ptype.getOutputMapFn():IdentityFn.getInstance()), ctxt))
                   .map(new OutputConverterFunction(c));
             }
             try {
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
index 0d1d5e0..9393ede 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
@@ -19,6 +19,7 @@ package org.apache.crunch.impl.spark.collect;
 
 import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.BaseInputCollection;
 import org.apache.crunch.impl.mr.run.CrunchInputFormat;
@@ -53,7 +54,7 @@ public class InputCollection<S> extends BaseInputCollection<S> implements SparkC
       input.rdd().setName(source.toString());
       return input
           .map(new InputConverterFunction(source.getConverter()))
-          .map(new MapFunction(source.getType().getInputMapFn(), runtime.getRuntimeContext()));
+          .map(new MapFunction(source.getConverter().applyPTypeTransforms()?source.getType().getInputMapFn():IdentityFn.getInstance(), runtime.getRuntimeContext()));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-- 
1.8.3.4 (Apple Git-47)

