Its not really possible to convert an RDD to a Column. You can think of a Column as an expression that produces a single output given some set of input columns. If I understand your code correctly, I think this might be easier to express as a UDF:
sqlContext.udf().register("stem", new UDF1<String, String>() { @Override public String call(String str) { return // TODO: stemming code here } }, DataTypes.StringType); DataFrame transformed = df.withColumn("filteredInput", expr("stem(rawInput)")); On Mon, Jan 4, 2016 at 8:08 PM, Andy Davidson <a...@santacruzintegration.com > wrote: > I am having a heck of a time writing a simple transformer in Java. I > assume that my Transformer is supposed to append a new column to the > dataFrame argument. Any idea why I get the following exception in Java 8 > when I try to call DataFrame withColumn()? The JavaDoc says withColumn() > "Returns > a new DataFrame > <http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame.html> > by adding a column or replacing the existing column that has the same > name.” > > > Also do transformers always run in the driver? If not I assume workers do > not have the sqlContext. Any idea how I can convert an javaRDD<> to a > Column with out a sqlContext? > > Kind regards > > Andy > > P.s. I am using spark 1.6.0 > > org.apache.spark.sql.AnalysisException: resolved attribute(s) > filteredOutput#1 missing from rawInput#0 in operator !Project > [rawInput#0,filteredOutput#1 AS filteredOutput#2]; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:183) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) > at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) > at org.apache.spark.sql.DataFrame.org > $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165) > at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751) > at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227) > at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110) > at > com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45) > > > > public class StemmerTransformer extends Transformer implements > Serializable { > String inputCol; // unit test sets to rawInput > > String outputCol; // unit test sets to filteredOutput > > … > > > public StemmerTransformer(SQLContext sqlContext) { > > // will only work if transformers execute in the driver > > this.sqlContext = sqlContext; > > } > > > @Override > > public DataFrame transform(DataFrame df) { > > df.printSchema(); > > df.show(); > > > > JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD(); > > JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> { > > // TODO add stemming code > > // Create a new Row > > Row ret = RowFactory.create("TODO"); > > return ret; > > }); > > > > //can we create a Col from a JavaRDD<Row>? > > > > List<StructField> fields = new ArrayList<StructField>(); > > boolean nullable = true; > > fields.add(DataTypes.createStructField(outputCol, DataTypes. > StringType, nullable)); > > > StructType schema = DataTypes.createStructType(fields); > > DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema > ); > > outputDF.printSchema(); > > outputDF.show(); > > Column newCol = outputDF.col(outputCol); > > > > return df.withColumn(outputCol, newCol); > > } > > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > > SLF4J: Actual binding is of type > [ch.qos.logback.classic.util.ContextSelectorStaticBinder] > > WARN 03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > > root > > |-- rawInput: array (nullable = false) > > | |-- element: string (containsNull = true) > > > +--------------------+ > > | rawInput| > > +--------------------+ > > |[I, saw, the, red...| > > |[Mary, had, a, li...| > > |[greet, greeting,...| > > +--------------------+ > > > root > > |-- filteredOutput: string (nullable = true) > > > +--------------+ > > |filteredOutput| > > +--------------+ > > | TODO| > > | TODO| > > | TODO| > > +--------------+ > > >