Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem
Very Nice!. Many thanks Kevin. I wish I found this out a couple of weeks ago. Andy From: Kevin Mellott Date: Wednesday, January 20, 2016 at 4:34 PM To: Andrew Davidson Cc: "user @spark" Subject: Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem > Hi Andy, > > According to the API documentation for DataFrame > <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql > .DataFrame> , you should have access to sqlContext as a property off of the > DataFrame instance. In your example, you could then do something like: > > df.sqlContext.udf.register(...) > > Thanks, > Kevin > > On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson > wrote: >> For clarity callUDF() is not defined on DataFrames. It is defined on >> org.apache.spark.sql.functions . Strange the class name starts with lower >> case. I have not figure out how to use function class. >> >> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.h >> tml >> >> Andy >> >> From: Andrew Davidson >> Date: Wednesday, January 20, 2016 at 4:05 PM >> To: "user @spark" >> Subject: trouble implementing complex transformer in java that can be used >> with Pipeline. Scala to Java porting problem >> >>> I am using 1.6.0. I am having trouble implementing a custom transformer >>> derived from org.apache.spark.ml.Transformer in Java that I can use in a >>> PipeLine. >>> >>> So far the only way I figure out how to implement any kind of complex >>> functionality and have it applied to a DataFrame is to implement a UDF. For >>> example >>> >>> >>>class StemmerUDF implements UDF1>, Serializable { >>> >>> private static final long serialVersionUID = 1L; >>> >>> >>> >>> @Override >>> >>> public List call(String text) throws Exception { >>> >>> List ret = stemText(text); //call org.apache.lucene >>> >>> return ret; >>> >>> } >>> >>> } >>> >>> >>> >>> Before I can use the UDF it needs to be registered. This requires the >>> sqlContext. The problem is sqlContext is not available during >>> pipeline.load() >>> >>> >>>void registerUDF(SQLContext sqlContext) { >>> >>> if (udf == null) { >>> >>> udf = new StemmerUDF(); >>> >>> DataType returnType = >>> DataTypes.createArrayType(DataTypes.StringType); >>> >>> sqlContext.udf().register(udfName, udf, returnType); >>> >>> } >>> >>> } >>> >>> >>> Our transformer needs to implement transform(). For it to be able to use the >>> registered UDF we need the sqlContext. The problem is the sqlContext is not >>> part of the signature of transform. My current hack is to pass the >>> sqlContext to the constructor and not to use pipelines >>> @Override >>> >>> public DataFrame transform(DataFrame df) { >>> >>> String fmt = "%s(%s) as %s"; >>> >>> String stmt = String.format(fmt, udfName, inputCol, outputCol); >>> >>> logger.info("\nstmt: {}", stmt); >>> >>> DataFrame ret = df.selectExpr("*", stmt); >>> >>> return ret; >>> >>> } >>> >>> >>> >>> Is they a way to do something like df.callUDF(myUDF); >>> >>> >>> >>> The following Scala code looks like it is close to what I need. I not been >>> able to figure out how do something like this in Java 8. callUDF does not >>> seem to be avaliable. >>> >>> >>> >>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala >>> >>> @DeveloperApi >>> >>> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]] >>> >>> extends Transformer with HasInputCol with HasOutputCol with Logging { >>> >>> >>> >>> . . . >>> >>> >>> >>> override def transform(dataset: DataFrame): DataFrame = { >>> >>> transformSchema(dataset.schema, logging = true) >>> >>> dataset.withColumn($(outputCol), >>> >>> callUDF(this.createTransformFunc, outputDataType, >>> dataset($(inputCol >>> >>> } >>> >>> >>> >>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer >>> .scala >>> >>> >>> >>> class Tokenizer(override val uid: String) >>> >>> extends UnaryTransformer[String, Seq[String], Tokenizer] with >>> DefaultParamsWritable { >>> >>> >>> >>> . . . >>> >>> override protected def createTransformFunc: String => Seq[String] = { >>> >>> _.toLowerCase.split("\\s") >>> >>> } >>> >>> . . . >>> >>> } >>> >>> >>> >>> Kind regards >>> >>> >>> >>> Andy >>> >>> >
Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem
Hi Andy, According to the API documentation for DataFrame <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame>, you should have access to *sqlContext* as a property off of the DataFrame instance. In your example, you could then do something like: df.sqlContext.udf.register(...) Thanks, Kevin On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson < a...@santacruzintegration.com> wrote: > For clarity callUDF() is not defined on DataFrames. It is defined on > org.apache.spark.sql.functions > . Strange the class name starts with lower case. I have not figure out > how to use function class. > > > http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html > > Andy > > From: Andrew Davidson > Date: Wednesday, January 20, 2016 at 4:05 PM > To: "user @spark" > Subject: trouble implementing complex transformer in java that can be > used with Pipeline. Scala to Java porting problem > > I am using 1.6.0. I am having trouble implementing a custom transformer > derived from org.apache.spark.ml.Transformer in Java that I can use in > a PipeLine. > > So far the only way I figure out how to implement any kind of complex > functionality and have it applied to a DataFrame is to implement a UDF. For > example > > >class StemmerUDF implements UDF1>, Serializable { > > private static final long serialVersionUID = 1L; > > > @Override > > public List call(String text) throws Exception { > > List ret = stemText(text); //call org.apache.lucene > > return ret; > > } > > } > > > Before I can use the UDF it needs to be registered. This requires the > sqlContext. *The problem is sqlContext is not available during > pipeline.load()* > >void registerUDF(SQLContext sqlContext) { > > if (udf == null) { > > udf = new StemmerUDF(); > > DataType returnType = DataTypes.createArrayType(DataTypes. > StringType); > > sqlContext.udf().register(udfName, udf, returnType); > > } > > } > > > Our transformer needs to implement transform(). For it to be able to use > the registered UDF we need the sqlContext. *The problem is the sqlContext > is not part of the signature of transform.* My current hack is to pass > the sqlContext to the constructor and not to use pipelines > > @Override > > public DataFrame transform(DataFrame df) { > > String fmt = "%s(%s) as %s"; > > String stmt = String.format(fmt, udfName, inputCol, outputCol); > > logger.info("\nstmt: {}", stmt); > > DataFrame ret = df.selectExpr("*", stmt); > > return ret; > > } > > > Is they a way to do something like df.callUDF(myUDF); > > > *The following Scala code looks like it is close to what I need. I not > been able to figure out how do something like this in Java 8. callUDF does > not seem to be avaliable.* > > > > spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala > > @DeveloperApi > > abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, > T]] > > extends Transformer with HasInputCol with HasOutputCol with Logging { > > . . . > > > override def transform(dataset: DataFrame): DataFrame = { > > transformSchema(dataset.schema, logging = true) > > dataset.withColumn($(outputCol), > > callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol > > > } > > > > spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala > > > class Tokenizer(override val uid: String) > > extends UnaryTransformer[String, Seq[String], Tokenizer] with > DefaultParamsWritable { > > > . . . > > override protected def createTransformFunc: String => Seq[String] = { > > _.toLowerCase.split("\\s") > > } > > . . . > > } > > > Kind regards > > > Andy > > >
Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem
For clarity callUDF() is not defined on DataFrames. It is defined on org.apache.spark.sql.functions . Strange the class name starts with lower case. I have not figure out how to use function class. http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions. html Andy From: Andrew Davidson Date: Wednesday, January 20, 2016 at 4:05 PM To: "user @spark" Subject: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem > I am using 1.6.0. I am having trouble implementing a custom transformer > derived from org.apache.spark.ml.Transformer in Java that I can use in a > PipeLine. > > So far the only way I figure out how to implement any kind of complex > functionality and have it applied to a DataFrame is to implement a UDF. For > example > > >class StemmerUDF implements UDF1>, Serializable { > > private static final long serialVersionUID = 1L; > > > > @Override > > public List call(String text) throws Exception { > > List ret = stemText(text); //call org.apache.lucene > > return ret; > > } > > } > > > > Before I can use the UDF it needs to be registered. This requires the > sqlContext. The problem is sqlContext is not available during pipeline.load() > > >void registerUDF(SQLContext sqlContext) { > > if (udf == null) { > > udf = new StemmerUDF(); > > DataType returnType = > DataTypes.createArrayType(DataTypes.StringType); > > sqlContext.udf().register(udfName, udf, returnType); > > } > > } > > > Our transformer needs to implement transform(). For it to be able to use the > registered UDF we need the sqlContext. The problem is the sqlContext is not > part of the signature of transform. My current hack is to pass the sqlContext > to the constructor and not to use pipelines > @Override > > public DataFrame transform(DataFrame df) { > > String fmt = "%s(%s) as %s"; > > String stmt = String.format(fmt, udfName, inputCol, outputCol); > > logger.info("\nstmt: {}", stmt); > > DataFrame ret = df.selectExpr("*", stmt); > > return ret; > > } > > > > Is they a way to do something like df.callUDF(myUDF); > > > > The following Scala code looks like it is close to what I need. I not been > able to figure out how do something like this in Java 8. callUDF does not seem > to be avaliable. > > > > spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala > > @DeveloperApi > > abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]] > > extends Transformer with HasInputCol with HasOutputCol with Logging { > > > > . . . > > > > override def transform(dataset: DataFrame): DataFrame = { > > transformSchema(dataset.schema, logging = true) > > dataset.withColumn($(outputCol), > > callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol > > } > > > > spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.s > cala > > > > class Tokenizer(override val uid: String) > > extends UnaryTransformer[String, Seq[String], Tokenizer] with > DefaultParamsWritable { > > > > . . . > > override protected def createTransformFunc: String => Seq[String] = { > > _.toLowerCase.split("\\s") > > } > > . . . > > } > > > > Kind regards > > > > Andy > >
trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem
I am using 1.6.0. I am having trouble implementing a custom transformer derived from org.apache.spark.ml.Transformer in Java that I can use in a PipeLine. So far the only way I figure out how to implement any kind of complex functionality and have it applied to a DataFrame is to implement a UDF. For example class StemmerUDF implements UDF1>, Serializable { private static final long serialVersionUID = 1L; @Override public List call(String text) throws Exception { List ret = stemText(text); //call org.apache.lucene return ret; } } Before I can use the UDF it needs to be registered. This requires the sqlContext. The problem is sqlContext is not available during pipeline.load() void registerUDF(SQLContext sqlContext) { if (udf == null) { udf = new StemmerUDF(); DataType returnType = DataTypes.createArrayType(DataTypes.StringType); sqlContext.udf().register(udfName, udf, returnType); } } Our transformer needs to implement transform(). For it to be able to use the registered UDF we need the sqlContext. The problem is the sqlContext is not part of the signature of transform. My current hack is to pass the sqlContext to the constructor and not to use pipelines @Override public DataFrame transform(DataFrame df) { String fmt = "%s(%s) as %s"; String stmt = String.format(fmt, udfName, inputCol, outputCol); logger.info("\nstmt: {}", stmt); DataFrame ret = df.selectExpr("*", stmt); return ret; } Is they a way to do something like df.callUDF(myUDF); The following Scala code looks like it is close to what I need. I not been able to figure out how do something like this in Java 8. callUDF does not seem to be avaliable. spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala @DeveloperApi abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]] extends Transformer with HasInputCol with HasOutputCol with Logging { . . . override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) dataset.withColumn($(outputCol), callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol } spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer .scala class Tokenizer(override val uid: String) extends UnaryTransformer[String, Seq[String], Tokenizer] with DefaultParamsWritable { . . . override protected def createTransformFunc: String => Seq[String] = { _.toLowerCase.split("\\s") } . . . } Kind regards Andy