If think it will be interesting to have the equivalents of mappartitions with 
dataframe. There are many use cases where data are processed in batch. Another 
example is a simple linear classifier Ax=b where A is the matrix of feature 
vectors, x the model and b the output. Here again the product Ax can be  done 
efficiently for a batch of data. 

I will test for the broadcast hack. But I'm wondering whether it is possible to 
append or zip a RDD as a new column of a Dataframe. The idea is to do 
mappartitions on the the RDD of the input column and then and the result as 
output column ?

Jao



> Le 3 mars 2015 à 22:04, Joseph Bradley <jos...@databricks.com> a écrit :
> 
> I see.  I think your best bet is to create the cnnModel on the master and 
> then serialize it to send to the workers.  If it's big (1M or so), then you 
> can broadcast it and use the broadcast variable in the UDF.  There is not a 
> great way to do something equivalent to mapPartitions with UDFs right now.
> 
>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaon...@gmail.com> wrote:
>> Here is my current implementation with current master version of spark 
>> 
>> class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol 
>> ... {
>> 
>> 
>>    override def transformSchema(...) { ... }
>> 
>>     override def transform(dataSet: DataFrame, paramMap: ParamMap): 
>> DataFrame = {
>> 
>>                   transformSchema(dataSet.schema, paramMap, logging = true)
>>                   val map = this.paramMap ++ paramMap
>> 
>>                   val deepCNNFeature = udf((v: Vector)=> {
>>                               val cnnModel = new CaffeModel 
>>                               cnnModel.transform(v)
>>                   } : Vector )
>>  
>> 
>>                  dataSet.withColumn(map(outputCol), 
>> deepCNNFeature(col(map(inputCol))))
>> 
>>      }
>> }
>> 
>> where CaffeModel is a java api to Caffe C++ model.
>> 
>> The problem here is that for every row it will create a new instance of 
>> CaffeModel which is inefficient since creating a new model
>> means loading a large model file. And it will transform only a single row at 
>> a time. Or a Caffe network can process a batch of rows efficiently. In other 
>> words, is it possible to create an UDF that can operatats on a partition in 
>> order to minimize the creation of a CaffeModel and 
>> to take advantage of the Caffe network batch processing ?
>> 
>> 
>> 
>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com> 
>>> wrote:
>>> I see, thanks for clarifying!
>>> 
>>> I'd recommend following existing implementations in spark.ml transformers.  
>>> You'll need to define a UDF which operates on a single Row to compute the 
>>> value for the new column.  You can then use the DataFrame DSL to create the 
>>> new column; the DSL provides a nice syntax for what would otherwise be a 
>>> SQL statement like "select ... from ...".  I'm recommending looking at the 
>>> existing implementation (rather than stating it here) because it changes 
>>> between Spark 1.2 and 1.3.  In 1.3, the DSL is much improved and makes it 
>>> easier to create a new column.
>>> 
>>> Joseph
>>> 
>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com> 
>>>> wrote:
>>>> class DeepCNNFeature extends Transformer ... {
>>>> 
>>>>     override def transform(data: DataFrame, paramMap: ParamMap): DataFrame 
>>>> = {
>>>> 
>>>>           
>>>>                  // How can I do a map partition on the underlying RDD and 
>>>> then add the column ?
>>>> 
>>>>      }
>>>> }
>>>> 
>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <jaon...@gmail.com> 
>>>>> wrote:
>>>>> Hi Joseph,
>>>>> 
>>>>> Thank your for the tips. I understand what should I do when my data are 
>>>>> represented as a RDD. The thing that I can't figure out is how to do the 
>>>>> same thing when the data is view as a DataFrame and I need to add the 
>>>>> result of my pretrained model as a new column in the DataFrame. 
>>>>> Preciselly, I want to implement the following transformer :
>>>>> 
>>>>> class DeepCNNFeature extends Transformer ... {
>>>>> 
>>>>> }
>>>>> 
>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <jos...@databricks.com> 
>>>>>> wrote:
>>>>>> Hi Jao,
>>>>>> 
>>>>>> You can use external tools and libraries if they can be called from your 
>>>>>> Spark program or script (with appropriate conversion of data types, 
>>>>>> etc.).  The best way to apply a pre-trained model to a dataset would be 
>>>>>> to call the model from within a closure, e.g.:
>>>>>> 
>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>>>>> 
>>>>>> If your data is distributed in an RDD (myRDD), then the above call will 
>>>>>> distribute the computation of prediction using the pre-trained model.  
>>>>>> It will require that all of your Spark workers be able to run the 
>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on all 
>>>>>> nodes in the compute cluster.
>>>>>> 
>>>>>> For the second question, I would modify the above call as follows:
>>>>>> 
>>>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>>>   val myModel = // instantiate neural network on this partition
>>>>>>   myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>>>>>> }
>>>>>> 
>>>>>> I hope this helps!
>>>>>> Joseph
>>>>>> 
>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <jaon...@gmail.com> 
>>>>>>> wrote:
>>>>>>> Dear all,
>>>>>>> 
>>>>>>> We mainly do large scale computer vision task (image classification, 
>>>>>>> retrieval, ...). The pipeline is really great stuff for that. We're 
>>>>>>> trying to reproduce the tutorial given on that topic during the latest 
>>>>>>> spark summit ( 
>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>>>>>>  ) using the master version of spark pipeline and dataframe. The 
>>>>>>> tutorial shows different examples of feature extraction stages before 
>>>>>>> running machine learning algorithms. Even the tutorial is 
>>>>>>> straightforward to reproduce with this new API, we still have some 
>>>>>>> questions :
>>>>>>> Can one use external tools (e.g via pipe) as a pipeline stage ? An 
>>>>>>> example of use case is to extract feature learned with convolutional 
>>>>>>> neural network. In our case, this corresponds to a pre-trained neural 
>>>>>>> network with Caffe library (http://caffe.berkeleyvision.org/) .
>>>>>>> The second question is about the performance of the pipeline. Library 
>>>>>>> such as Caffe processes the data in batch and instancing one Caffe 
>>>>>>> network can be time consuming when this network is very deep. So, we 
>>>>>>> can gain performance if we minimize the number of Caffe network 
>>>>>>> creation and give data in batch to the network. In the pipeline, this 
>>>>>>> corresponds to run transformers that work on a partition basis and give 
>>>>>>> the whole partition to a single caffe network. How can we create such a 
>>>>>>> transformer ?
>>>>>>> 
>>>>>>> 
>>>>>>> Best,
>>>>>>> 
>>>>>>> Jao
> 

Reply via email to