I am having a heck of a time writing a simple transformer in Java. I assume
that my Transformer is supposed to append a new column to the dataFrame
argument. Any idea why I get the following exception in Java 8 when I try to
call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new
DataFrame
<http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame
.html> by adding a column or replacing the existing column that has the
same name.²
Also do transformers always run in the driver? If not I assume workers do
not have the sqlContext. Any idea how I can convert an javaRDD<> to a Column
with out a sqlContext?
Kind regards
Andy
P.s. I am using spark 1.6.0
org.apache.spark.sql.AnalysisException: resolved attribute(s)
filteredOutput#1 missing from rawInput#0 in operator !Project
[rawInput#0,filteredOutput#1 AS filteredOutput#2];
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Chec
kAnalysis.scala:38)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:
44)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
1.apply(CheckAnalysis.scala:183)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
1.apply(CheckAnalysis.scala:50)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Che
ckAnalysis.scala:50)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala
:44)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.
scala:34)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(Data
Frame.scala:2165)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
at
com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)
public class StemmerTransformer extends Transformer implements Serializable
{
String inputCol; // unit test sets to rawInput
String outputCol; // unit test sets to filteredOutput
public StemmerTransformer(SQLContext sqlContext) {
// will only work if transformers execute in the driver
this.sqlContext = sqlContext;
}
@Override
public DataFrame transform(DataFrame df) {
df.printSchema();
df.show();
JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD();
JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> {
// TODO add stemming code
// Create a new Row
Row ret = RowFactory.create("TODO");
return ret;
});
//can we create a Col from a JavaRDD<Row>?
List<StructField> fields = new ArrayList<StructField>();
boolean nullable = true;
fields.add(DataTypes.createStructField(outputCol,
DataTypes.StringType, nullable));
StructType schema = DataTypes.createStructType(fields);
DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema);
outputDF.printSchema();
outputDF.show();
Column newCol = outputDF.col(outputCol);
return df.withColumn(outputCol, newCol);
}
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]
WARN 03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
root
|-- rawInput: array (nullable = false)
| |-- element: string (containsNull = true)
+--------------------+
| rawInput|
+--------------------+
|[I, saw, the, red...|
|[Mary, had, a, li...|
|[greet, greeting,...|
+--------------------+
root
|-- filteredOutput: string (nullable = true)
+--------------+
|filteredOutput|
+--------------+
| TODO|
| TODO|
| TODO|
+--------------+