Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-31 Thread Jaonary Rabarisoa
In my transformSchema I do specify that the output column type is a VectorUDT :






*override def transformSchema(schema: StructType, paramMap: ParamMap):
StructType = {  val map = this.paramMap ++ paramMap
checkInputColumn(schema, map(inputCol), ArrayType(FloatType, false))
addOutputColumn(schema, map(outputCol), new VectorUDT)}*


The output of printSchema is as follow :

*|-- cnnFeature: vecto (nullable = false)*



On Tue, Mar 31, 2015 at 9:55 AM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> My guess is that the `createDataFrame` call is failing here.  Can you
> check if the schema being passed to it includes the column name and type
> for the newly being zipped `features` ?
>
> Joseph probably knows this better, but AFAIK the DenseVector here will
> need to be marked as a VectorUDT while creating a DataFrame column
>
> Thanks
> Shivaram
>
> On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa 
> wrote:
>
>> Following your suggestion, I end up with the following implementation :
>>
>>
>>
>>
>>
>>
>>
>> *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = 
>> {  val schema = transformSchema(dataSet.schema, paramMap, logging = true)  
>> val map = this.paramMap ++ paramMap*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *val features = dataSet.select(map(inputCol)).mapPartitions { rows =>
>> Caffe.set_mode(Caffe.CPU)val net = 
>> CaffeUtils.floatTestNetwork(SparkFiles.get(topology), 
>> SparkFiles.get(weight))val inputBlobs: FloatBlobVector = 
>> net.input_blobs()val N: Int = 1val K: Int = 
>> inputBlobs.get(0).channels()val H: Int = inputBlobs.get(0).height()
>> val W: Int = inputBlobs.get(0).width()inputBlobs.get(0).Reshape(N, K, H, 
>> W)val dataBlob = new FloatPointer(N*K*W*H)*
>> val inputCPUData = inputBlobs.get(0).mutable_cpu_data()
>>
>> val feat = rows.map { case Row(a: Iterable[Float])=>
>>   dataBlob.put(a.toArray, 0, a.size)
>>   caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
>>   val resultBlobs: FloatBlobVector = net.ForwardPrefilled()
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *  val resultDim = resultBlobs.get(0).channels()  logInfo(s"Output 
>> dimension $resultDim")  val resultBlobData = 
>> resultBlobs.get(0).cpu_data()  val output = new Array[Float](resultDim)  
>> resultBlobData.get(output)  Vectors.dense(output.map(_.toDouble))
>> }//net.deallocate()feat  }  val newRowData = 
>> dataSet.rdd.zip(features).map { case (old, feat)=>val oldSeq = old.toSeq 
>>  Row.fromSeq(oldSeq :+ feat)  }  
>> dataSet.sqlContext.createDataFrame(newRowData, schema)}*
>>
>>
>> The idea is to mapPartitions of the underlying RDD of the DataFrame and
>> create a new DataFrame by zipping the results. It seems to work but when I
>> try to save the RDD I got the following error :
>>
>> org.apache.spark.mllib.linalg.DenseVector cannot be cast to
>> org.apache.spark.sql.Row
>>
>>
>> On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>> One workaround could be to convert a DataFrame into a RDD inside the
>>> transform function and then use mapPartitions/broadcast to work with the
>>> JNI calls and then convert back to RDD.
>>>
>>> Thanks
>>> Shivaram
>>>
>>> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa 
>>> wrote:
>>>
 Dear all,

 I'm still struggling to make a pre-trained caffe model transformer for
 dataframe works. The main problem is that creating a caffe model inside the
 UDF is very slow and consumes memories.

 Some of you suggest to broadcast the model. The problem with
 broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
  and it is not serializable.

 Another possible approach is to use a UDF that can handle a whole
 partitions instead of just a row in order to minimize the caffe model
 instantiation.

 Is there any ideas to solve one of these two issues ?



 Best,

 Jao

 On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley 
 wrote:

> I see.  I think your best bet is to create the cnnModel on the master
> and then serialize it to send to the workers.  If it's big (1M or so), 
> then
> you can broadcast it and use the broadcast variable in the UDF.  There is
> not a great way to do something equivalent to mapPartitions with UDFs 
> right
> now.
>
> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa 
> wrote:
>
>> Here is my current implementation with current master version of
>> spark
>>
>>
>>
>>
>> *class DeepCNNFeature extends Transformer with HasInputCol with
>> HasOutputCol ... {   override def transformSchema(...) { ... }*
>> *override def transform(dataSet: DataFrame, paramMap: ParamMap):
>> DataFrame = {*
>>
>> *  transformSchema(dataSet.schema, par

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-31 Thread Shivaram Venkataraman
My guess is that the `createDataFrame` call is failing here.  Can you check
if the schema being passed to it includes the column name and type for the
newly being zipped `features` ?

Joseph probably knows this better, but AFAIK the DenseVector here will need
to be marked as a VectorUDT while creating a DataFrame column

Thanks
Shivaram

On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa 
wrote:

> Following your suggestion, I end up with the following implementation :
>
>
>
>
>
>
>
> *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = 
> {  val schema = transformSchema(dataSet.schema, paramMap, logging = true)  
> val map = this.paramMap ++ paramMap*
>
>
>
>
>
>
>
>
>
>
>
>
>
> *val features = dataSet.select(map(inputCol)).mapPartitions { rows =>
> Caffe.set_mode(Caffe.CPU)val net = 
> CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight)) 
>val inputBlobs: FloatBlobVector = net.input_blobs()val N: Int = 1
> val K: Int = inputBlobs.get(0).channels()val H: Int = 
> inputBlobs.get(0).height()val W: Int = inputBlobs.get(0).width()
> inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new 
> FloatPointer(N*K*W*H)*
> val inputCPUData = inputBlobs.get(0).mutable_cpu_data()
>
> val feat = rows.map { case Row(a: Iterable[Float])=>
>   dataBlob.put(a.toArray, 0, a.size)
>   caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
>   val resultBlobs: FloatBlobVector = net.ForwardPrefilled()
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *  val resultDim = resultBlobs.get(0).channels()  logInfo(s"Output 
> dimension $resultDim")  val resultBlobData = 
> resultBlobs.get(0).cpu_data()  val output = new Array[Float](resultDim)   
>resultBlobData.get(output)  Vectors.dense(output.map(_.toDouble))} 
>//net.deallocate()feat  }  val newRowData = 
> dataSet.rdd.zip(features).map { case (old, feat)=>val oldSeq = old.toSeq  
> Row.fromSeq(oldSeq :+ feat)  }  
> dataSet.sqlContext.createDataFrame(newRowData, schema)}*
>
>
> The idea is to mapPartitions of the underlying RDD of the DataFrame and
> create a new DataFrame by zipping the results. It seems to work but when I
> try to save the RDD I got the following error :
>
> org.apache.spark.mllib.linalg.DenseVector cannot be cast to
> org.apache.spark.sql.Row
>
>
> On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>> One workaround could be to convert a DataFrame into a RDD inside the
>> transform function and then use mapPartitions/broadcast to work with the
>> JNI calls and then convert back to RDD.
>>
>> Thanks
>> Shivaram
>>
>> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa 
>> wrote:
>>
>>> Dear all,
>>>
>>> I'm still struggling to make a pre-trained caffe model transformer for
>>> dataframe works. The main problem is that creating a caffe model inside the
>>> UDF is very slow and consumes memories.
>>>
>>> Some of you suggest to broadcast the model. The problem with
>>> broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
>>>  and it is not serializable.
>>>
>>> Another possible approach is to use a UDF that can handle a whole
>>> partitions instead of just a row in order to minimize the caffe model
>>> instantiation.
>>>
>>> Is there any ideas to solve one of these two issues ?
>>>
>>>
>>>
>>> Best,
>>>
>>> Jao
>>>
>>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley 
>>> wrote:
>>>
 I see.  I think your best bet is to create the cnnModel on the master
 and then serialize it to send to the workers.  If it's big (1M or so), then
 you can broadcast it and use the broadcast variable in the UDF.  There is
 not a great way to do something equivalent to mapPartitions with UDFs right
 now.

 On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa 
 wrote:

> Here is my current implementation with current master version of spark
>
>
>
>
> *class DeepCNNFeature extends Transformer with HasInputCol with
> HasOutputCol ... {   override def transformSchema(...) { ... }*
> *override def transform(dataSet: DataFrame, paramMap: ParamMap):
> DataFrame = {*
>
> *  transformSchema(dataSet.schema, paramMap, logging = 
> true)*
>
>
>
> *  val map = this.paramMap ++ paramMap  
> val deepCNNFeature = udf((v: Vector)=> {*
>
> *  val cnnModel = new CaffeModel *
>
> *  cnnModel.transform(v)*
>
>
>
>
> *  } : Vector )  
> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol*
>
>
> * }*
> *}*
>
> where CaffeModel is a java api to Caffe C++ model.
>
> The problem here is that for every row it will create a new instance
> of CaffeModel which is inefficient s

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-31 Thread Jaonary Rabarisoa
Following your suggestion, I end up with the following implementation :







*override def transform(dataSet: DataFrame, paramMap: ParamMap):
DataFrame = {  val schema = transformSchema(dataSet.schema, paramMap,
logging = true)  val map = this.paramMap ++ paramMap*













*val features = dataSet.select(map(inputCol)).mapPartitions { rows =>
  Caffe.set_mode(Caffe.CPU)val net =
CaffeUtils.floatTestNetwork(SparkFiles.get(topology),
SparkFiles.get(weight))val inputBlobs: FloatBlobVector =
net.input_blobs()val N: Int = 1val K: Int =
inputBlobs.get(0).channels()val H: Int =
inputBlobs.get(0).height()val W: Int = inputBlobs.get(0).width()
 inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new
FloatPointer(N*K*W*H)*
val inputCPUData = inputBlobs.get(0).mutable_cpu_data()

val feat = rows.map { case Row(a: Iterable[Float])=>
  dataBlob.put(a.toArray, 0, a.size)
  caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
  val resultBlobs: FloatBlobVector = net.ForwardPrefilled()























*  val resultDim = resultBlobs.get(0).channels()
logInfo(s"Output dimension $resultDim")  val resultBlobData =
resultBlobs.get(0).cpu_data()  val output = new
Array[Float](resultDim)  resultBlobData.get(output)
Vectors.dense(output.map(_.toDouble))}//net.deallocate()
feat  }  val newRowData = dataSet.rdd.zip(features).map { case (old,
feat)=>val oldSeq = old.toSeq  Row.fromSeq(oldSeq :+ feat)  }
dataSet.sqlContext.createDataFrame(newRowData, schema)}*


The idea is to mapPartitions of the underlying RDD of the DataFrame and
create a new DataFrame by zipping the results. It seems to work but when I
try to save the RDD I got the following error :

org.apache.spark.mllib.linalg.DenseVector cannot be cast to
org.apache.spark.sql.Row


On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> One workaround could be to convert a DataFrame into a RDD inside the
> transform function and then use mapPartitions/broadcast to work with the
> JNI calls and then convert back to RDD.
>
> Thanks
> Shivaram
>
> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa 
> wrote:
>
>> Dear all,
>>
>> I'm still struggling to make a pre-trained caffe model transformer for
>> dataframe works. The main problem is that creating a caffe model inside the
>> UDF is very slow and consumes memories.
>>
>> Some of you suggest to broadcast the model. The problem with broadcasting
>> is that I use a JNI interface to caffe C++ with javacpp-preset  and it is
>> not serializable.
>>
>> Another possible approach is to use a UDF that can handle a whole
>> partitions instead of just a row in order to minimize the caffe model
>> instantiation.
>>
>> Is there any ideas to solve one of these two issues ?
>>
>>
>>
>> Best,
>>
>> Jao
>>
>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley 
>> wrote:
>>
>>> I see.  I think your best bet is to create the cnnModel on the master
>>> and then serialize it to send to the workers.  If it's big (1M or so), then
>>> you can broadcast it and use the broadcast variable in the UDF.  There is
>>> not a great way to do something equivalent to mapPartitions with UDFs right
>>> now.
>>>
>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa 
>>> wrote:
>>>
 Here is my current implementation with current master version of spark




 *class DeepCNNFeature extends Transformer with HasInputCol with
 HasOutputCol ... {   override def transformSchema(...) { ... }*
 *override def transform(dataSet: DataFrame, paramMap: ParamMap):
 DataFrame = {*

 *  transformSchema(dataSet.schema, paramMap, logging = 
 true)*



 *  val map = this.paramMap ++ paramMap  
 val deepCNNFeature = udf((v: Vector)=> {*

 *  val cnnModel = new CaffeModel *

 *  cnnModel.transform(v)*




 *  } : Vector )  
 dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol*


 * }*
 *}*

 where CaffeModel is a java api to Caffe C++ model.

 The problem here is that for every row it will create a new instance of
 CaffeModel which is inefficient since creating a new model
 means loading a large model file. And it will transform only a single
 row at a time. Or a Caffe network can process a batch of rows efficiently.
 In other words, is it possible to create an UDF that can operatats on a
 partition in order to minimize the creation of a CaffeModel and
 to take advantage of the Caffe network batch processing ?



 On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley 
 wrote:

> I see, thanks for clarifying!
>
> I'd recommend following existing implementations in spark.ml
> transformers.  You'll need to define a

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-30 Thread Shivaram Venkataraman
One workaround could be to convert a DataFrame into a RDD inside the
transform function and then use mapPartitions/broadcast to work with the
JNI calls and then convert back to RDD.

Thanks
Shivaram

On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa 
wrote:

> Dear all,
>
> I'm still struggling to make a pre-trained caffe model transformer for
> dataframe works. The main problem is that creating a caffe model inside the
> UDF is very slow and consumes memories.
>
> Some of you suggest to broadcast the model. The problem with broadcasting
> is that I use a JNI interface to caffe C++ with javacpp-preset  and it is
> not serializable.
>
> Another possible approach is to use a UDF that can handle a whole
> partitions instead of just a row in order to minimize the caffe model
> instantiation.
>
> Is there any ideas to solve one of these two issues ?
>
>
>
> Best,
>
> Jao
>
> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley 
> wrote:
>
>> I see.  I think your best bet is to create the cnnModel on the master and
>> then serialize it to send to the workers.  If it's big (1M or so), then you
>> can broadcast it and use the broadcast variable in the UDF.  There is not a
>> great way to do something equivalent to mapPartitions with UDFs right now.
>>
>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa 
>> wrote:
>>
>>> Here is my current implementation with current master version of spark
>>>
>>>
>>>
>>>
>>> *class DeepCNNFeature extends Transformer with HasInputCol with
>>> HasOutputCol ... {   override def transformSchema(...) { ... }*
>>> *override def transform(dataSet: DataFrame, paramMap: ParamMap):
>>> DataFrame = {*
>>>
>>> *  transformSchema(dataSet.schema, paramMap, logging = 
>>> true)*
>>>
>>>
>>>
>>> *  val map = this.paramMap ++ paramMap  val 
>>> deepCNNFeature = udf((v: Vector)=> {*
>>>
>>> *  val cnnModel = new CaffeModel *
>>>
>>> *  cnnModel.transform(v)*
>>>
>>>
>>>
>>>
>>> *  } : Vector )  
>>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol*
>>>
>>>
>>> * }*
>>> *}*
>>>
>>> where CaffeModel is a java api to Caffe C++ model.
>>>
>>> The problem here is that for every row it will create a new instance of
>>> CaffeModel which is inefficient since creating a new model
>>> means loading a large model file. And it will transform only a single
>>> row at a time. Or a Caffe network can process a batch of rows efficiently.
>>> In other words, is it possible to create an UDF that can operatats on a
>>> partition in order to minimize the creation of a CaffeModel and
>>> to take advantage of the Caffe network batch processing ?
>>>
>>>
>>>
>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley 
>>> wrote:
>>>
 I see, thanks for clarifying!

 I'd recommend following existing implementations in spark.ml
 transformers.  You'll need to define a UDF which operates on a single Row
 to compute the value for the new column.  You can then use the DataFrame
 DSL to create the new column; the DSL provides a nice syntax for what would
 otherwise be a SQL statement like "select ... from ...".  I'm recommending
 looking at the existing implementation (rather than stating it here)
 because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is much
 improved and makes it easier to create a new column.

 Joseph

 On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa 
 wrote:

> class DeepCNNFeature extends Transformer ... {
>
> override def transform(data: DataFrame, paramMap: ParamMap):
> DataFrame = {
>
>
>  // How can I do a map partition on the underlying RDD
> and then add the column ?
>
>  }
> }
>
> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa 
> wrote:
>
>> Hi Joseph,
>>
>> Thank your for the tips. I understand what should I do when my data
>> are represented as a RDD. The thing that I can't figure out is how to do
>> the same thing when the data is view as a DataFrame and I need to add the
>> result of my pretrained model as a new column in the DataFrame. 
>> Preciselly,
>> I want to implement the following transformer :
>>
>> class DeepCNNFeature extends Transformer ... {
>>
>> }
>>
>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley > > wrote:
>>
>>> Hi Jao,
>>>
>>> You can use external tools and libraries if they can be called from
>>> your Spark program or script (with appropriate conversion of data types,
>>> etc.).  The best way to apply a pre-trained model to a dataset would be 
>>> to
>>> call the model from within a closure, e.g.:
>>>
>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>>
>>> If your data is distributed in an RDD (myRDD), then the above call
>>> will distribut

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-30 Thread Jaonary Rabarisoa
Dear all,

I'm still struggling to make a pre-trained caffe model transformer for
dataframe works. The main problem is that creating a caffe model inside the
UDF is very slow and consumes memories.

Some of you suggest to broadcast the model. The problem with broadcasting
is that I use a JNI interface to caffe C++ with javacpp-preset  and it is
not serializable.

Another possible approach is to use a UDF that can handle a whole
partitions instead of just a row in order to minimize the caffe model
instantiation.

Is there any ideas to solve one of these two issues ?



Best,

Jao

On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley 
wrote:

> I see.  I think your best bet is to create the cnnModel on the master and
> then serialize it to send to the workers.  If it's big (1M or so), then you
> can broadcast it and use the broadcast variable in the UDF.  There is not a
> great way to do something equivalent to mapPartitions with UDFs right now.
>
> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa 
> wrote:
>
>> Here is my current implementation with current master version of spark
>>
>>
>>
>>
>> *class DeepCNNFeature extends Transformer with HasInputCol with
>> HasOutputCol ... {   override def transformSchema(...) { ... }*
>> *override def transform(dataSet: DataFrame, paramMap: ParamMap):
>> DataFrame = {*
>>
>> *  transformSchema(dataSet.schema, paramMap, logging = true)*
>>
>>
>>
>> *  val map = this.paramMap ++ paramMap  val 
>> deepCNNFeature = udf((v: Vector)=> {*
>>
>> *  val cnnModel = new CaffeModel *
>>
>> *  cnnModel.transform(v)*
>>
>>
>>
>>
>> *  } : Vector )  
>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol*
>>
>>
>> * }*
>> *}*
>>
>> where CaffeModel is a java api to Caffe C++ model.
>>
>> The problem here is that for every row it will create a new instance of
>> CaffeModel which is inefficient since creating a new model
>> means loading a large model file. And it will transform only a single row
>> at a time. Or a Caffe network can process a batch of rows efficiently.
>> In other words, is it possible to create an UDF that can operatats on a
>> partition in order to minimize the creation of a CaffeModel and
>> to take advantage of the Caffe network batch processing ?
>>
>>
>>
>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley 
>> wrote:
>>
>>> I see, thanks for clarifying!
>>>
>>> I'd recommend following existing implementations in spark.ml
>>> transformers.  You'll need to define a UDF which operates on a single Row
>>> to compute the value for the new column.  You can then use the DataFrame
>>> DSL to create the new column; the DSL provides a nice syntax for what would
>>> otherwise be a SQL statement like "select ... from ...".  I'm recommending
>>> looking at the existing implementation (rather than stating it here)
>>> because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is much
>>> improved and makes it easier to create a new column.
>>>
>>> Joseph
>>>
>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa 
>>> wrote:
>>>
 class DeepCNNFeature extends Transformer ... {

 override def transform(data: DataFrame, paramMap: ParamMap):
 DataFrame = {


  // How can I do a map partition on the underlying RDD
 and then add the column ?

  }
 }

 On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa 
 wrote:

> Hi Joseph,
>
> Thank your for the tips. I understand what should I do when my data
> are represented as a RDD. The thing that I can't figure out is how to do
> the same thing when the data is view as a DataFrame and I need to add the
> result of my pretrained model as a new column in the DataFrame. 
> Preciselly,
> I want to implement the following transformer :
>
> class DeepCNNFeature extends Transformer ... {
>
> }
>
> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley 
> wrote:
>
>> Hi Jao,
>>
>> You can use external tools and libraries if they can be called from
>> your Spark program or script (with appropriate conversion of data types,
>> etc.).  The best way to apply a pre-trained model to a dataset would be 
>> to
>> call the model from within a closure, e.g.:
>>
>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>
>> If your data is distributed in an RDD (myRDD), then the above call
>> will distribute the computation of prediction using the pre-trained 
>> model.
>> It will require that all of your Spark workers be able to run the
>> preTrainedModel; that may mean installing Caffe and dependencies on all
>> nodes in the compute cluster.
>>
>> For the second question, I would modify the above call as follows:
>>
>> myRDD.mapPartitions { myDataOnPartition =>
>>   val myModel =

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-03 Thread Jaonary
If think it will be interesting to have the equivalents of mappartitions with 
dataframe. There are many use cases where data are processed in batch. Another 
example is a simple linear classifier Ax=b where A is the matrix of feature 
vectors, x the model and b the output. Here again the product Ax can be  done 
efficiently for a batch of data. 

I will test for the broadcast hack. But I'm wondering whether it is possible to 
append or zip a RDD as a new column of a Dataframe. The idea is to do 
mappartitions on the the RDD of the input column and then and the result as 
output column ?

Jao



> Le 3 mars 2015 à 22:04, Joseph Bradley  a écrit :
> 
> I see.  I think your best bet is to create the cnnModel on the master and 
> then serialize it to send to the workers.  If it's big (1M or so), then you 
> can broadcast it and use the broadcast variable in the UDF.  There is not a 
> great way to do something equivalent to mapPartitions with UDFs right now.
> 
>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa  wrote:
>> Here is my current implementation with current master version of spark 
>> 
>> class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol 
>> ... {
>> 
>> 
>>override def transformSchema(...) { ... }
>> 
>> override def transform(dataSet: DataFrame, paramMap: ParamMap): 
>> DataFrame = {
>> 
>>   transformSchema(dataSet.schema, paramMap, logging = true)
>>   val map = this.paramMap ++ paramMap
>> 
>>   val deepCNNFeature = udf((v: Vector)=> {
>>   val cnnModel = new CaffeModel 
>>   cnnModel.transform(v)
>>   } : Vector )
>>  
>> 
>>  dataSet.withColumn(map(outputCol), 
>> deepCNNFeature(col(map(inputCol
>> 
>>  }
>> }
>> 
>> where CaffeModel is a java api to Caffe C++ model.
>> 
>> The problem here is that for every row it will create a new instance of 
>> CaffeModel which is inefficient since creating a new model
>> means loading a large model file. And it will transform only a single row at 
>> a time. Or a Caffe network can process a batch of rows efficiently. In other 
>> words, is it possible to create an UDF that can operatats on a partition in 
>> order to minimize the creation of a CaffeModel and 
>> to take advantage of the Caffe network batch processing ?
>> 
>> 
>> 
>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley  
>>> wrote:
>>> I see, thanks for clarifying!
>>> 
>>> I'd recommend following existing implementations in spark.ml transformers.  
>>> You'll need to define a UDF which operates on a single Row to compute the 
>>> value for the new column.  You can then use the DataFrame DSL to create the 
>>> new column; the DSL provides a nice syntax for what would otherwise be a 
>>> SQL statement like "select ... from ...".  I'm recommending looking at the 
>>> existing implementation (rather than stating it here) because it changes 
>>> between Spark 1.2 and 1.3.  In 1.3, the DSL is much improved and makes it 
>>> easier to create a new column.
>>> 
>>> Joseph
>>> 
 On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa  
 wrote:
 class DeepCNNFeature extends Transformer ... {
 
 override def transform(data: DataFrame, paramMap: ParamMap): DataFrame 
 = {
 
   
  // How can I do a map partition on the underlying RDD and 
 then add the column ?
 
  }
 }
 
> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa  
> wrote:
> Hi Joseph,
> 
> Thank your for the tips. I understand what should I do when my data are 
> represented as a RDD. The thing that I can't figure out is how to do the 
> same thing when the data is view as a DataFrame and I need to add the 
> result of my pretrained model as a new column in the DataFrame. 
> Preciselly, I want to implement the following transformer :
> 
> class DeepCNNFeature extends Transformer ... {
> 
> }
> 
>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley  
>> wrote:
>> Hi Jao,
>> 
>> You can use external tools and libraries if they can be called from your 
>> Spark program or script (with appropriate conversion of data types, 
>> etc.).  The best way to apply a pre-trained model to a dataset would be 
>> to call the model from within a closure, e.g.:
>> 
>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>> 
>> If your data is distributed in an RDD (myRDD), then the above call will 
>> distribute the computation of prediction using the pre-trained model.  
>> It will require that all of your Spark workers be able to run the 
>> preTrainedModel; that may mean installing Caffe and dependencies on all 
>> nodes in the compute cluster.
>> 
>> For the second question, I would modify the above call as follows:
>> 
>> myRDD.mapPartitions { myDataOnPartition 

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-03 Thread Joseph Bradley
I see.  I think your best bet is to create the cnnModel on the master and
then serialize it to send to the workers.  If it's big (1M or so), then you
can broadcast it and use the broadcast variable in the UDF.  There is not a
great way to do something equivalent to mapPartitions with UDFs right now.

On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa  wrote:

> Here is my current implementation with current master version of spark
>
>
>
>
> *class DeepCNNFeature extends Transformer with HasInputCol with
> HasOutputCol ... {   override def transformSchema(...) { ... }*
> *override def transform(dataSet: DataFrame, paramMap: ParamMap):
> DataFrame = {*
>
> *  transformSchema(dataSet.schema, paramMap, logging = true)*
>
>
>
> *  val map = this.paramMap ++ paramMap  val 
> deepCNNFeature = udf((v: Vector)=> {*
>
> *  val cnnModel = new CaffeModel *
>
> *  cnnModel.transform(v)*
>
>
>
>
> *  } : Vector )  
> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol*
>
>
> * }*
> *}*
>
> where CaffeModel is a java api to Caffe C++ model.
>
> The problem here is that for every row it will create a new instance of
> CaffeModel which is inefficient since creating a new model
> means loading a large model file. And it will transform only a single row
> at a time. Or a Caffe network can process a batch of rows efficiently. In
> other words, is it possible to create an UDF that can operatats on a
> partition in order to minimize the creation of a CaffeModel and
> to take advantage of the Caffe network batch processing ?
>
>
>
> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley 
> wrote:
>
>> I see, thanks for clarifying!
>>
>> I'd recommend following existing implementations in spark.ml
>> transformers.  You'll need to define a UDF which operates on a single Row
>> to compute the value for the new column.  You can then use the DataFrame
>> DSL to create the new column; the DSL provides a nice syntax for what would
>> otherwise be a SQL statement like "select ... from ...".  I'm recommending
>> looking at the existing implementation (rather than stating it here)
>> because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is much
>> improved and makes it easier to create a new column.
>>
>> Joseph
>>
>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa 
>> wrote:
>>
>>> class DeepCNNFeature extends Transformer ... {
>>>
>>> override def transform(data: DataFrame, paramMap: ParamMap):
>>> DataFrame = {
>>>
>>>
>>>  // How can I do a map partition on the underlying RDD
>>> and then add the column ?
>>>
>>>  }
>>> }
>>>
>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa 
>>> wrote:
>>>
 Hi Joseph,

 Thank your for the tips. I understand what should I do when my data are
 represented as a RDD. The thing that I can't figure out is how to do the
 same thing when the data is view as a DataFrame and I need to add the
 result of my pretrained model as a new column in the DataFrame. Preciselly,
 I want to implement the following transformer :

 class DeepCNNFeature extends Transformer ... {

 }

 On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley 
 wrote:

> Hi Jao,
>
> You can use external tools and libraries if they can be called from
> your Spark program or script (with appropriate conversion of data types,
> etc.).  The best way to apply a pre-trained model to a dataset would be to
> call the model from within a closure, e.g.:
>
> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>
> If your data is distributed in an RDD (myRDD), then the above call
> will distribute the computation of prediction using the pre-trained model.
> It will require that all of your Spark workers be able to run the
> preTrainedModel; that may mean installing Caffe and dependencies on all
> nodes in the compute cluster.
>
> For the second question, I would modify the above call as follows:
>
> myRDD.mapPartitions { myDataOnPartition =>
>   val myModel = // instantiate neural network on this partition
>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
> }
>
> I hope this helps!
> Joseph
>
> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa  > wrote:
>
>> Dear all,
>>
>>
>> We mainly do large scale computer vision task (image classification,
>> retrieval, ...). The pipeline is really great stuff for that. We're 
>> trying
>> to reproduce the tutorial given on that topic during the latest spark
>> summit (
>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>  )
>> using the master version of spark pipeline and dataframe. The tutorial
>> shows different examples of feature extraction stages before run

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-03 Thread Jaonary Rabarisoa
Here is my current implementation with current master version of spark




*class DeepCNNFeature extends Transformer with HasInputCol with
HasOutputCol ... {   override def transformSchema(...) { ... }*
*override def transform(dataSet: DataFrame, paramMap: ParamMap):
DataFrame = {*

*  transformSchema(dataSet.schema, paramMap, logging = true)*



*  val map = this.paramMap ++ paramMap
 val deepCNNFeature = udf((v: Vector)=> {*

*  val cnnModel = new CaffeModel *

*  cnnModel.transform(v)*




*  } : Vector )
dataSet.withColumn(map(outputCol),
deepCNNFeature(col(map(inputCol*


* }*
*}*

where CaffeModel is a java api to Caffe C++ model.

The problem here is that for every row it will create a new instance of
CaffeModel which is inefficient since creating a new model
means loading a large model file. And it will transform only a single row
at a time. Or a Caffe network can process a batch of rows efficiently. In
other words, is it possible to create an UDF that can operatats on a
partition in order to minimize the creation of a CaffeModel and
to take advantage of the Caffe network batch processing ?



On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley 
wrote:

> I see, thanks for clarifying!
>
> I'd recommend following existing implementations in spark.ml
> transformers.  You'll need to define a UDF which operates on a single Row
> to compute the value for the new column.  You can then use the DataFrame
> DSL to create the new column; the DSL provides a nice syntax for what would
> otherwise be a SQL statement like "select ... from ...".  I'm recommending
> looking at the existing implementation (rather than stating it here)
> because it changes between Spark 1.2 and 1.3.  In 1.3, the DSL is much
> improved and makes it easier to create a new column.
>
> Joseph
>
> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa 
> wrote:
>
>> class DeepCNNFeature extends Transformer ... {
>>
>> override def transform(data: DataFrame, paramMap: ParamMap):
>> DataFrame = {
>>
>>
>>  // How can I do a map partition on the underlying RDD
>> and then add the column ?
>>
>>  }
>> }
>>
>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa 
>> wrote:
>>
>>> Hi Joseph,
>>>
>>> Thank your for the tips. I understand what should I do when my data are
>>> represented as a RDD. The thing that I can't figure out is how to do the
>>> same thing when the data is view as a DataFrame and I need to add the
>>> result of my pretrained model as a new column in the DataFrame. Preciselly,
>>> I want to implement the following transformer :
>>>
>>> class DeepCNNFeature extends Transformer ... {
>>>
>>> }
>>>
>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley 
>>> wrote:
>>>
 Hi Jao,

 You can use external tools and libraries if they can be called from
 your Spark program or script (with appropriate conversion of data types,
 etc.).  The best way to apply a pre-trained model to a dataset would be to
 call the model from within a closure, e.g.:

 myRDD.map { myDatum => preTrainedModel.predict(myDatum) }

 If your data is distributed in an RDD (myRDD), then the above call will
 distribute the computation of prediction using the pre-trained model.  It
 will require that all of your Spark workers be able to run the
 preTrainedModel; that may mean installing Caffe and dependencies on all
 nodes in the compute cluster.

 For the second question, I would modify the above call as follows:

 myRDD.mapPartitions { myDataOnPartition =>
   val myModel = // instantiate neural network on this partition
   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
 }

 I hope this helps!
 Joseph

 On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa 
 wrote:

> Dear all,
>
>
> We mainly do large scale computer vision task (image classification,
> retrieval, ...). The pipeline is really great stuff for that. We're trying
> to reproduce the tutorial given on that topic during the latest spark
> summit (
> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>  )
> using the master version of spark pipeline and dataframe. The tutorial
> shows different examples of feature extraction stages before running
> machine learning algorithms. Even the tutorial is straightforward to
> reproduce with this new API, we still have some questions :
>
>- Can one use external tools (e.g via pipe) as a pipeline stage ?
>An example of use case is to extract feature learned with convolutional
>neural network. In our case, this corresponds to a pre-trained neural
>network with Caffe library (http://caffe.berkeleyvision.org/) .
>
>
>- The second question is about the performance of the pipeline.
>Libr

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-02 Thread Joseph Bradley
I see, thanks for clarifying!

I'd recommend following existing implementations in spark.ml transformers.
You'll need to define a UDF which operates on a single Row to compute the
value for the new column.  You can then use the DataFrame DSL to create the
new column; the DSL provides a nice syntax for what would otherwise be a
SQL statement like "select ... from ...".  I'm recommending looking at the
existing implementation (rather than stating it here) because it changes
between Spark 1.2 and 1.3.  In 1.3, the DSL is much improved and makes it
easier to create a new column.

Joseph

On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa  wrote:

> class DeepCNNFeature extends Transformer ... {
>
> override def transform(data: DataFrame, paramMap: ParamMap): DataFrame
> = {
>
>
>  // How can I do a map partition on the underlying RDD and
> then add the column ?
>
>  }
> }
>
> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa 
> wrote:
>
>> Hi Joseph,
>>
>> Thank your for the tips. I understand what should I do when my data are
>> represented as a RDD. The thing that I can't figure out is how to do the
>> same thing when the data is view as a DataFrame and I need to add the
>> result of my pretrained model as a new column in the DataFrame. Preciselly,
>> I want to implement the following transformer :
>>
>> class DeepCNNFeature extends Transformer ... {
>>
>> }
>>
>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley 
>> wrote:
>>
>>> Hi Jao,
>>>
>>> You can use external tools and libraries if they can be called from your
>>> Spark program or script (with appropriate conversion of data types, etc.).
>>> The best way to apply a pre-trained model to a dataset would be to call the
>>> model from within a closure, e.g.:
>>>
>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>>
>>> If your data is distributed in an RDD (myRDD), then the above call will
>>> distribute the computation of prediction using the pre-trained model.  It
>>> will require that all of your Spark workers be able to run the
>>> preTrainedModel; that may mean installing Caffe and dependencies on all
>>> nodes in the compute cluster.
>>>
>>> For the second question, I would modify the above call as follows:
>>>
>>> myRDD.mapPartitions { myDataOnPartition =>
>>>   val myModel = // instantiate neural network on this partition
>>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>>> }
>>>
>>> I hope this helps!
>>> Joseph
>>>
>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa 
>>> wrote:
>>>
 Dear all,


 We mainly do large scale computer vision task (image classification,
 retrieval, ...). The pipeline is really great stuff for that. We're trying
 to reproduce the tutorial given on that topic during the latest spark
 summit (
 http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
  )
 using the master version of spark pipeline and dataframe. The tutorial
 shows different examples of feature extraction stages before running
 machine learning algorithms. Even the tutorial is straightforward to
 reproduce with this new API, we still have some questions :

- Can one use external tools (e.g via pipe) as a pipeline stage ?
An example of use case is to extract feature learned with convolutional
neural network. In our case, this corresponds to a pre-trained neural
network with Caffe library (http://caffe.berkeleyvision.org/) .


- The second question is about the performance of the pipeline.
Library such as Caffe processes the data in batch and instancing one 
 Caffe
network can be time consuming when this network is very deep. So, we can
gain performance if we minimize the number of Caffe network creation and
give data in batch to the network. In the pipeline, this corresponds to 
 run
transformers that work on a partition basis and give the whole 
 partition to
a single caffe network. How can we create such a transformer ?



 Best,

 Jao

>>>
>>>
>>
>


Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-01 Thread Jaonary Rabarisoa
class DeepCNNFeature extends Transformer ... {

override def transform(data: DataFrame, paramMap: ParamMap): DataFrame
= {


 // How can I do a map partition on the underlying RDD and
then add the column ?

 }
}

On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa 
wrote:

> Hi Joseph,
>
> Thank your for the tips. I understand what should I do when my data are
> represented as a RDD. The thing that I can't figure out is how to do the
> same thing when the data is view as a DataFrame and I need to add the
> result of my pretrained model as a new column in the DataFrame. Preciselly,
> I want to implement the following transformer :
>
> class DeepCNNFeature extends Transformer ... {
>
> }
>
> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley 
> wrote:
>
>> Hi Jao,
>>
>> You can use external tools and libraries if they can be called from your
>> Spark program or script (with appropriate conversion of data types, etc.).
>> The best way to apply a pre-trained model to a dataset would be to call the
>> model from within a closure, e.g.:
>>
>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>
>> If your data is distributed in an RDD (myRDD), then the above call will
>> distribute the computation of prediction using the pre-trained model.  It
>> will require that all of your Spark workers be able to run the
>> preTrainedModel; that may mean installing Caffe and dependencies on all
>> nodes in the compute cluster.
>>
>> For the second question, I would modify the above call as follows:
>>
>> myRDD.mapPartitions { myDataOnPartition =>
>>   val myModel = // instantiate neural network on this partition
>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>> }
>>
>> I hope this helps!
>> Joseph
>>
>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa 
>> wrote:
>>
>>> Dear all,
>>>
>>>
>>> We mainly do large scale computer vision task (image classification,
>>> retrieval, ...). The pipeline is really great stuff for that. We're trying
>>> to reproduce the tutorial given on that topic during the latest spark
>>> summit (
>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>>  )
>>> using the master version of spark pipeline and dataframe. The tutorial
>>> shows different examples of feature extraction stages before running
>>> machine learning algorithms. Even the tutorial is straightforward to
>>> reproduce with this new API, we still have some questions :
>>>
>>>- Can one use external tools (e.g via pipe) as a pipeline stage ? An
>>>example of use case is to extract feature learned with convolutional 
>>> neural
>>>network. In our case, this corresponds to a pre-trained neural network 
>>> with
>>>Caffe library (http://caffe.berkeleyvision.org/) .
>>>
>>>
>>>- The second question is about the performance of the pipeline.
>>>Library such as Caffe processes the data in batch and instancing one 
>>> Caffe
>>>network can be time consuming when this network is very deep. So, we can
>>>gain performance if we minimize the number of Caffe network creation and
>>>give data in batch to the network. In the pipeline, this corresponds to 
>>> run
>>>transformers that work on a partition basis and give the whole partition 
>>> to
>>>a single caffe network. How can we create such a transformer ?
>>>
>>>
>>>
>>> Best,
>>>
>>> Jao
>>>
>>
>>
>


Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-01 Thread Jaonary Rabarisoa
Hi Joseph,

Thank your for the tips. I understand what should I do when my data are
represented as a RDD. The thing that I can't figure out is how to do the
same thing when the data is view as a DataFrame and I need to add the
result of my pretrained model as a new column in the DataFrame. Preciselly,
I want to implement the following transformer :

class DeepCNNFeature extends Transformer ... {

}

On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley 
wrote:

> Hi Jao,
>
> You can use external tools and libraries if they can be called from your
> Spark program or script (with appropriate conversion of data types, etc.).
> The best way to apply a pre-trained model to a dataset would be to call the
> model from within a closure, e.g.:
>
> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>
> If your data is distributed in an RDD (myRDD), then the above call will
> distribute the computation of prediction using the pre-trained model.  It
> will require that all of your Spark workers be able to run the
> preTrainedModel; that may mean installing Caffe and dependencies on all
> nodes in the compute cluster.
>
> For the second question, I would modify the above call as follows:
>
> myRDD.mapPartitions { myDataOnPartition =>
>   val myModel = // instantiate neural network on this partition
>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
> }
>
> I hope this helps!
> Joseph
>
> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa 
> wrote:
>
>> Dear all,
>>
>>
>> We mainly do large scale computer vision task (image classification,
>> retrieval, ...). The pipeline is really great stuff for that. We're trying
>> to reproduce the tutorial given on that topic during the latest spark
>> summit (
>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>  )
>> using the master version of spark pipeline and dataframe. The tutorial
>> shows different examples of feature extraction stages before running
>> machine learning algorithms. Even the tutorial is straightforward to
>> reproduce with this new API, we still have some questions :
>>
>>- Can one use external tools (e.g via pipe) as a pipeline stage ? An
>>example of use case is to extract feature learned with convolutional 
>> neural
>>network. In our case, this corresponds to a pre-trained neural network 
>> with
>>Caffe library (http://caffe.berkeleyvision.org/) .
>>
>>
>>- The second question is about the performance of the pipeline.
>>Library such as Caffe processes the data in batch and instancing one Caffe
>>network can be time consuming when this network is very deep. So, we can
>>gain performance if we minimize the number of Caffe network creation and
>>give data in batch to the network. In the pipeline, this corresponds to 
>> run
>>transformers that work on a partition basis and give the whole partition 
>> to
>>a single caffe network. How can we create such a transformer ?
>>
>>
>>
>> Best,
>>
>> Jao
>>
>
>


Re: Some questions after playing a little with the new ml.Pipeline.

2015-02-28 Thread Joseph Bradley
Hi Jao,

You can use external tools and libraries if they can be called from your
Spark program or script (with appropriate conversion of data types, etc.).
The best way to apply a pre-trained model to a dataset would be to call the
model from within a closure, e.g.:

myRDD.map { myDatum => preTrainedModel.predict(myDatum) }

If your data is distributed in an RDD (myRDD), then the above call will
distribute the computation of prediction using the pre-trained model.  It
will require that all of your Spark workers be able to run the
preTrainedModel; that may mean installing Caffe and dependencies on all
nodes in the compute cluster.

For the second question, I would modify the above call as follows:

myRDD.mapPartitions { myDataOnPartition =>
  val myModel = // instantiate neural network on this partition
  myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
}

I hope this helps!
Joseph

On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa 
wrote:

> Dear all,
>
>
> We mainly do large scale computer vision task (image classification,
> retrieval, ...). The pipeline is really great stuff for that. We're trying
> to reproduce the tutorial given on that topic during the latest spark
> summit (
> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>  )
> using the master version of spark pipeline and dataframe. The tutorial
> shows different examples of feature extraction stages before running
> machine learning algorithms. Even the tutorial is straightforward to
> reproduce with this new API, we still have some questions :
>
>- Can one use external tools (e.g via pipe) as a pipeline stage ? An
>example of use case is to extract feature learned with convolutional neural
>network. In our case, this corresponds to a pre-trained neural network with
>Caffe library (http://caffe.berkeleyvision.org/) .
>
>
>- The second question is about the performance of the pipeline.
>Library such as Caffe processes the data in batch and instancing one Caffe
>network can be time consuming when this network is very deep. So, we can
>gain performance if we minimize the number of Caffe network creation and
>give data in batch to the network. In the pipeline, this corresponds to run
>transformers that work on a partition basis and give the whole partition to
>a single caffe network. How can we create such a transformer ?
>
>
>
> Best,
>
> Jao
>