Its not really possible to convert an RDD to a Column.  You can think of a
Column as an expression that produces a single output given some set of
input columns.  If I understand your code correctly, I think this might be
easier to express as a UDF:

sqlContext.udf().register("stem", new UDF1<String, String>() {
  @Override
  public String call(String str) {
    return // TODO: stemming code here
  }
}, DataTypes.StringType);

DataFrame transformed = df.withColumn("filteredInput", expr("stem(rawInput)"));


On Mon, Jan 4, 2016 at 8:08 PM, Andy Davidson <a...@santacruzintegration.com
> wrote:

> I am having a heck of a time writing a simple transformer in Java. I
> assume that my Transformer is supposed to append a new column to the
> dataFrame argument. Any idea why I get the following exception in Java 8
> when I try to call DataFrame withColumn()? The JavaDoc says withColumn() 
> "Returns
> a new DataFrame
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame.html>
>  by adding a column or replacing the existing column that has the same
> name.”
>
>
> Also do transformers always run in the driver? If not I assume workers do
> not have the sqlContext. Any idea how I can convert an javaRDD<> to a
> Column with out a sqlContext?
>
> Kind regards
>
> Andy
>
> P.s. I am using spark 1.6.0
>
> org.apache.spark.sql.AnalysisException: resolved attribute(s)
> filteredOutput#1 missing from rawInput#0 in operator !Project
> [rawInput#0,filteredOutput#1 AS filteredOutput#2];
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:183)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
> at org.apache.spark.sql.DataFrame.org
> $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
> at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
> at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
> at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
> at
> com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)
>
>
>
> public class StemmerTransformer extends Transformer implements
> Serializable {
>    String inputCol; // unit test sets to rawInput
>
>    String outputCol; // unit test sets to filteredOutput
>
>   …
>
>
>   public StemmerTransformer(SQLContext sqlContext) {
>
> // will only work if transformers execute in the driver
>
>         this.sqlContext = sqlContext;
>
>     }
>
>
>      @Override
>
>     public DataFrame transform(DataFrame df) {
>
>         df.printSchema();
>
>         df.show();
>
>
>
>         JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD();
>
>         JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> {
>
>             // TODO add stemming code
>
>             // Create a new Row
>
>             Row ret = RowFactory.create("TODO");
>
>             return ret;
>
>         });
>
>
>
>         //can we create a Col from a JavaRDD<Row>?
>
>
>
>         List<StructField> fields = new ArrayList<StructField>();
>
>         boolean nullable = true;
>
>         fields.add(DataTypes.createStructField(outputCol, DataTypes.
> StringType, nullable));
>
>
>         StructType schema =  DataTypes.createStructType(fields);
>
>         DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema
> );
>
>         outputDF.printSchema();
>
>         outputDF.show();
>
>         Column newCol = outputDF.col(outputCol);
>
>
>
>         return df.withColumn(outputCol, newCol);
>
>     }
>
>
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
>
> SLF4J: Actual binding is of type
> [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
>
> WARN  03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to
> load native-hadoop library for your platform... using builtin-java classes
> where applicable
>
> root
>
>  |-- rawInput: array (nullable = false)
>
>  |    |-- element: string (containsNull = true)
>
>
> +--------------------+
>
> |            rawInput|
>
> +--------------------+
>
> |[I, saw, the, red...|
>
> |[Mary, had, a, li...|
>
> |[greet, greeting,...|
>
> +--------------------+
>
>
> root
>
>  |-- filteredOutput: string (nullable = true)
>
>
> +--------------+
>
> |filteredOutput|
>
> +--------------+
>
> |          TODO|
>
> |          TODO|
>
> |          TODO|
>
> +--------------+
>
>
>

Reply via email to