Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem

2016-01-20 Thread Andy Davidson
Very Nice!. Many thanks Kevin. I wish I found this out a couple of weeks
ago.

Andy

From:  Kevin Mellott 
Date:  Wednesday, January 20, 2016 at 4:34 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: trouble implementing complex transformer in java that can be
used with Pipeline. Scala to Java porting problem

> Hi Andy,
> 
> According to the API documentation for DataFrame
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql
> .DataFrame> , you should have access to sqlContext as a property off of the
> DataFrame instance. In your example, you could then do something like:
> 
> df.sqlContext.udf.register(...)
> 
> Thanks,
> Kevin
> 
> On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson 
> wrote:
>> For clarity callUDF() is not defined on DataFrames. It is defined on
>> org.apache.spark.sql.functions . Strange the class name starts with lower
>> case. I have not figure out how to use function class.
>> 
>> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.h
>> tml
>> 
>> Andy
>> 
>> From:  Andrew Davidson 
>> Date:  Wednesday, January 20, 2016 at 4:05 PM
>> To:  "user @spark" 
>> Subject:  trouble implementing  complex transformer in java that can be used
>> with Pipeline. Scala to Java porting problem
>> 
>>> I am using 1.6.0. I am having trouble implementing a custom transformer
>>> derived from org.apache.spark.ml.Transformer in Java that I can use in a
>>> PipeLine.
>>> 
>>> So far the only way I figure out how to implement any kind of complex
>>> functionality and have it applied to a DataFrame is to implement a UDF. For
>>> example
>>> 
>>> 
>>>class StemmerUDF implements UDF1>, Serializable {
>>> 
>>> private static final long serialVersionUID = 1L;
>>> 
>>> 
>>> 
>>> @Override
>>> 
>>> public List call(String text) throws Exception {
>>> 
>>> List ret = stemText(text); //call org.apache.lucene
>>> 
>>> return ret;
>>> 
>>> }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> Before I can use the UDF it needs to be registered. This requires the
>>> sqlContext. The problem is sqlContext is not available during
>>> pipeline.load()
>>> 
>>> 
>>>void registerUDF(SQLContext sqlContext) {
>>> 
>>> if (udf == null) {
>>> 
>>> udf = new StemmerUDF();
>>> 
>>> DataType returnType =
>>> DataTypes.createArrayType(DataTypes.StringType);
>>> 
>>> sqlContext.udf().register(udfName, udf, returnType);
>>> 
>>> }
>>> 
>>> }
>>> 
>>> 
>>> Our transformer needs to implement transform(). For it to be able to use the
>>> registered UDF we need the sqlContext. The problem is the sqlContext is not
>>> part of the signature of transform. My current hack is to pass the
>>> sqlContext to the constructor and not to use pipelines
>>>   @Override
>>> 
>>> public DataFrame transform(DataFrame df) {
>>> 
>>>   String fmt = "%s(%s) as %s";
>>> 
>>> String stmt = String.format(fmt, udfName, inputCol, outputCol);
>>> 
>>> logger.info("\nstmt: {}", stmt);
>>> 
>>> DataFrame ret = df.selectExpr("*", stmt);
>>> 
>>> return ret;
>>> 
>>> }
>>> 
>>> 
>>> 
>>> Is they a way to do something like df.callUDF(myUDF);
>>> 
>>> 
>>> 
>>> The following Scala code looks like it is close to what I need. I not been
>>> able to figure out how do something like this in Java 8. callUDF does not
>>> seem to be avaliable.
>>> 
>>> 
>>> 
>>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>>> 
>>> @DeveloperApi
>>> 
>>> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]]
>>> 
>>>   extends Transformer with HasInputCol with HasOutputCol with Logging {
>>> 
>>> 
>>> 
>>> . . .
>>> 
>>> 
>>> 
>>>  override def transform(dataset: DataFrame): DataFrame = {
>>> 
>>> transformSchema(dataset.schema, logging = true)
>>> 
>>> dataset.withColumn($(outputCol),
>>> 
>>>   callUDF(this.createTransformFunc, outputDataType,
>>> dataset($(inputCol
>>> 
>>>   }
>>> 
>>> 
>>> 
>>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer
>>> .scala 
>>> 
>>> 
>>> 
>>> class Tokenizer(override val uid: String)
>>> 
>>>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
>>> DefaultParamsWritable {
>>> 
>>> 
>>> 
>>> . . .
>>> 
>>>   override protected def createTransformFunc: String => Seq[String] = {
>>> 
>>> _.toLowerCase.split("\\s")
>>> 
>>>   }
>>> 
>>> . . .
>>> 
>>> }
>>> 
>>> 
>>> 
>>> Kind regards
>>> 
>>> 
>>> 
>>> Andy
>>> 
>>> 
> 




Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem

2016-01-20 Thread Kevin Mellott
Hi Andy,

According to the API documentation for DataFrame
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame>,
you should have access to *sqlContext* as a property off of the DataFrame
instance. In your example, you could then do something like:

df.sqlContext.udf.register(...)

Thanks,
Kevin

On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> For clarity callUDF() is not defined on DataFrames. It is defined on 
> org.apache.spark.sql.functions
> . Strange the class name starts with lower case. I have not figure out
> how to use function class.
>
>
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html
>
> Andy
>
> From: Andrew Davidson 
> Date: Wednesday, January 20, 2016 at 4:05 PM
> To: "user @spark" 
> Subject: trouble implementing complex transformer in java that can be
> used with Pipeline. Scala to Java porting problem
>
> I am using 1.6.0. I am having trouble implementing a custom transformer
> derived from org.apache.spark.ml.Transformer in Java that I can use in
> a PipeLine.
>
> So far the only way I figure out how to implement any kind of complex
> functionality and have it applied to a DataFrame is to implement a UDF. For
> example
>
>
>class StemmerUDF implements UDF1>, Serializable {
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
> public List call(String text) throws Exception {
>
> List ret = stemText(text); //call org.apache.lucene
>
> return ret;
>
> }
>
> }
>
>
> Before I can use the UDF it needs to be registered. This requires the
> sqlContext. *The problem is sqlContext is not available during
> pipeline.load()*
>
>void registerUDF(SQLContext sqlContext) {
>
> if (udf == null) {
>
> udf = new StemmerUDF();
>
> DataType returnType = DataTypes.createArrayType(DataTypes.
> StringType);
>
> sqlContext.udf().register(udfName, udf, returnType);
>
> }
>
> }
>
>
> Our transformer needs to implement transform(). For it to be able to use
> the registered UDF we need the sqlContext. *The problem is the sqlContext
> is not part of the signature of transform.* My current hack is to pass
> the sqlContext to the constructor and not to use pipelines
>
>   @Override
>
> public DataFrame transform(DataFrame df) {
>
>   String fmt = "%s(%s) as %s";
>
> String stmt = String.format(fmt, udfName, inputCol, outputCol);
>
> logger.info("\nstmt: {}", stmt);
>
> DataFrame ret = df.selectExpr("*", stmt);
>
> return ret;
>
> }
>
>
> Is they a way to do something like df.callUDF(myUDF);
>
>
> *The following Scala code looks like it is close to what I need. I not
> been able to figure out how do something like this in Java 8. callUDF does
> not seem to be avaliable.*
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>
> @DeveloperApi
>
> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT,
> T]]
>
>   extends Transformer with HasInputCol with HasOutputCol with Logging {
>
> . . .
>
>
>  override def transform(dataset: DataFrame): DataFrame = {
>
> transformSchema(dataset.schema, logging = true)
>
> dataset.withColumn($(outputCol),
>
>   callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol
> 
>
>   }
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
>
>
> class Tokenizer(override val uid: String)
>
>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
> DefaultParamsWritable {
>
>
> . . .
>
>   override protected def createTransformFunc: String => Seq[String] = {
>
> _.toLowerCase.split("\\s")
>
>   }
>
> . . .
>
> }
>
>
> Kind regards
>
>
> Andy
>
>
>


Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem

2016-01-20 Thread Andy Davidson
For clarity callUDF() is not defined on DataFrames. It is defined on
org.apache.spark.sql.functions . Strange the class name starts with lower
case. I have not figure out how to use function class.

http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.
html

Andy

From:  Andrew Davidson 
Date:  Wednesday, January 20, 2016 at 4:05 PM
To:  "user @spark" 
Subject:  trouble implementing  complex transformer in java that can be used
with Pipeline. Scala to Java porting problem

> I am using 1.6.0. I am having trouble implementing a custom transformer
> derived from org.apache.spark.ml.Transformer in Java that I can use in a
> PipeLine.
> 
> So far the only way I figure out how to implement any kind of complex
> functionality and have it applied to a DataFrame is to implement a UDF. For
> example
> 
> 
>class StemmerUDF implements UDF1>, Serializable {
> 
> private static final long serialVersionUID = 1L;
> 
> 
> 
> @Override
> 
> public List call(String text) throws Exception {
> 
> List ret = stemText(text); //call org.apache.lucene
> 
> return ret;
> 
> }
> 
> }
> 
> 
> 
> Before I can use the UDF it needs to be registered. This requires the
> sqlContext. The problem is sqlContext is not available during pipeline.load()
> 
> 
>void registerUDF(SQLContext sqlContext) {
> 
> if (udf == null) {
> 
> udf = new StemmerUDF();
> 
> DataType returnType =
> DataTypes.createArrayType(DataTypes.StringType);
> 
> sqlContext.udf().register(udfName, udf, returnType);
> 
> }
> 
> }
> 
> 
> Our transformer needs to implement transform(). For it to be able to use the
> registered UDF we need the sqlContext. The problem is the sqlContext is not
> part of the signature of transform. My current hack is to pass the sqlContext
> to the constructor and not to use pipelines
>   @Override
> 
> public DataFrame transform(DataFrame df) {
> 
>   String fmt = "%s(%s) as %s";
> 
> String stmt = String.format(fmt, udfName, inputCol, outputCol);
> 
> logger.info("\nstmt: {}", stmt);
> 
> DataFrame ret = df.selectExpr("*", stmt);
> 
> return ret;
> 
> }
> 
> 
> 
> Is they a way to do something like df.callUDF(myUDF);
> 
> 
> 
> The following Scala code looks like it is close to what I need. I not been
> able to figure out how do something like this in Java 8. callUDF does not seem
> to be avaliable.
> 
> 
> 
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
> 
> @DeveloperApi
> 
> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]]
> 
>   extends Transformer with HasInputCol with HasOutputCol with Logging {
> 
> 
> 
> . . .
> 
> 
> 
>  override def transform(dataset: DataFrame): DataFrame = {
> 
> transformSchema(dataset.schema, logging = true)
> 
> dataset.withColumn($(outputCol),
> 
>   callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol
> 
>   }
> 
> 
> 
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.s
> cala 
> 
> 
> 
> class Tokenizer(override val uid: String)
> 
>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
> DefaultParamsWritable {
> 
> 
> 
> . . .
> 
>   override protected def createTransformFunc: String => Seq[String] = {
> 
> _.toLowerCase.split("\\s")
> 
>   }
> 
> . . .
> 
> }
> 
> 
> 
> Kind regards
> 
> 
> 
> Andy
> 
> 




trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem

2016-01-20 Thread Andy Davidson
I am using 1.6.0. I am having trouble implementing a custom transformer
derived from org.apache.spark.ml.Transformer in Java that I can use in a
PipeLine.

So far the only way I figure out how to implement any kind of complex
functionality and have it applied to a DataFrame is to implement a UDF. For
example


   class StemmerUDF implements UDF1>, Serializable {

private static final long serialVersionUID = 1L;



@Override

public List call(String text) throws Exception {

List ret = stemText(text); //call org.apache.lucene

return ret;

}

}



Before I can use the UDF it needs to be registered. This requires the
sqlContext. The problem is sqlContext is not available during
pipeline.load() 


   void registerUDF(SQLContext sqlContext) {

if (udf == null) {

udf = new StemmerUDF();

DataType returnType =
DataTypes.createArrayType(DataTypes.StringType);

sqlContext.udf().register(udfName, udf, returnType);

}

}


Our transformer needs to implement transform(). For it to be able to use the
registered UDF we need the sqlContext. The problem is the sqlContext is not
part of the signature of transform. My current hack is to pass the
sqlContext to the constructor and not to use pipelines
  @Override

public DataFrame transform(DataFrame df) {

  String fmt = "%s(%s) as %s";

String stmt = String.format(fmt, udfName, inputCol, outputCol);

logger.info("\nstmt: {}", stmt);

DataFrame ret = df.selectExpr("*", stmt);

return ret;

}



Is they a way to do something like df.callUDF(myUDF);



The following Scala code looks like it is close to what I need. I not been
able to figure out how do something like this in Java 8. callUDF does not
seem to be avaliable.



spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala

@DeveloperApi

abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]]

  extends Transformer with HasInputCol with HasOutputCol with Logging {



. . .



 override def transform(dataset: DataFrame): DataFrame = {

transformSchema(dataset.schema, logging = true)

dataset.withColumn($(outputCol),

  callUDF(this.createTransformFunc, outputDataType,
dataset($(inputCol

  }



spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer
.scala 



class Tokenizer(override val uid: String)

  extends UnaryTransformer[String, Seq[String], Tokenizer] with
DefaultParamsWritable {



. . .

  override protected def createTransformFunc: String => Seq[String] = {

_.toLowerCase.split("\\s")

  }

. . .

}



Kind regards



Andy