Following your suggestion, I end up with the following implementation :
*override def transform(dataSet: DataFrame, paramMap: ParamMap):
DataFrame = { val schema = transformSchema(dataSet.schema, paramMap,
logging = true) val map = this.paramMap ++ paramMap*
*val features = dataSet.select(map(inputCol)).mapPartitions { rows =>
Caffe.set_mode(Caffe.CPU) val net =
CaffeUtils.floatTestNetwork(SparkFiles.get(topology),
SparkFiles.get(weight)) val inputBlobs: FloatBlobVector =
net.input_blobs() val N: Int = 1 val K: Int =
inputBlobs.get(0).channels() val H: Int =
inputBlobs.get(0).height() val W: Int = inputBlobs.get(0).width()
inputBlobs.get(0).Reshape(N, K, H, W) val dataBlob = new
FloatPointer(N*K*W*H)*
val inputCPUData = inputBlobs.get(0).mutable_cpu_data()
val feat = rows.map { case Row(a: Iterable[Float])=>
dataBlob.put(a.toArray, 0, a.size)
caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
val resultBlobs: FloatBlobVector = net.ForwardPrefilled()
* val resultDim = resultBlobs.get(0).channels()
logInfo(s"Output dimension $resultDim") val resultBlobData =
resultBlobs.get(0).cpu_data() val output = new
Array[Float](resultDim) resultBlobData.get(output)
Vectors.dense(output.map(_.toDouble)) } //net.deallocate()
feat } val newRowData = dataSet.rdd.zip(features).map { case (old,
feat)=> val oldSeq = old.toSeq Row.fromSeq(oldSeq :+ feat) }
dataSet.sqlContext.createDataFrame(newRowData, schema)}*
The idea is to mapPartitions of the underlying RDD of the DataFrame and
create a new DataFrame by zipping the results. It seems to work but when I
try to save the RDD I got the following error :
org.apache.spark.mllib.linalg.DenseVector cannot be cast to
org.apache.spark.sql.Row
On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
[email protected]> wrote:
> One workaround could be to convert a DataFrame into a RDD inside the
> transform function and then use mapPartitions/broadcast to work with the
> JNI calls and then convert back to RDD.
>
> Thanks
> Shivaram
>
> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <[email protected]>
> wrote:
>
>> Dear all,
>>
>> I'm still struggling to make a pre-trained caffe model transformer for
>> dataframe works. The main problem is that creating a caffe model inside the
>> UDF is very slow and consumes memories.
>>
>> Some of you suggest to broadcast the model. The problem with broadcasting
>> is that I use a JNI interface to caffe C++ with javacpp-preset and it is
>> not serializable.
>>
>> Another possible approach is to use a UDF that can handle a whole
>> partitions instead of just a row in order to minimize the caffe model
>> instantiation.
>>
>> Is there any ideas to solve one of these two issues ?
>>
>>
>>
>> Best,
>>
>> Jao
>>
>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <[email protected]>
>> wrote:
>>
>>> I see. I think your best bet is to create the cnnModel on the master
>>> and then serialize it to send to the workers. If it's big (1M or so), then
>>> you can broadcast it and use the broadcast variable in the UDF. There is
>>> not a great way to do something equivalent to mapPartitions with UDFs right
>>> now.
>>>
>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <[email protected]>
>>> wrote:
>>>
>>>> Here is my current implementation with current master version of spark
>>>>
>>>>
>>>>
>>>>
>>>> *class DeepCNNFeature extends Transformer with HasInputCol with
>>>> HasOutputCol ... { override def transformSchema(...) { ... }*
>>>> * override def transform(dataSet: DataFrame, paramMap: ParamMap):
>>>> DataFrame = {*
>>>>
>>>> * transformSchema(dataSet.schema, paramMap, logging =
>>>> true)*
>>>>
>>>>
>>>>
>>>> * val map = this.paramMap ++ paramMap
>>>> val deepCNNFeature = udf((v: Vector)=> {*
>>>>
>>>> * val cnnModel = new CaffeModel *
>>>>
>>>> * cnnModel.transform(v)*
>>>>
>>>>
>>>>
>>>>
>>>> * } : Vector )
>>>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))*
>>>>
>>>>
>>>> * }*
>>>> *}*
>>>>
>>>> where CaffeModel is a java api to Caffe C++ model.
>>>>
>>>> The problem here is that for every row it will create a new instance of
>>>> CaffeModel which is inefficient since creating a new model
>>>> means loading a large model file. And it will transform only a single
>>>> row at a time. Or a Caffe network can process a batch of rows efficiently.
>>>> In other words, is it possible to create an UDF that can operatats on a
>>>> partition in order to minimize the creation of a CaffeModel and
>>>> to take advantage of the Caffe network batch processing ?
>>>>
>>>>
>>>>
>>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <[email protected]>
>>>> wrote:
>>>>
>>>>> I see, thanks for clarifying!
>>>>>
>>>>> I'd recommend following existing implementations in spark.ml
>>>>> transformers. You'll need to define a UDF which operates on a single Row
>>>>> to compute the value for the new column. You can then use the DataFrame
>>>>> DSL to create the new column; the DSL provides a nice syntax for what
>>>>> would
>>>>> otherwise be a SQL statement like "select ... from ...". I'm recommending
>>>>> looking at the existing implementation (rather than stating it here)
>>>>> because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much
>>>>> improved and makes it easier to create a new column.
>>>>>
>>>>> Joseph
>>>>>
>>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>
>>>>>> override def transform(data: DataFrame, paramMap: ParamMap):
>>>>>> DataFrame = {
>>>>>>
>>>>>>
>>>>>> // How can I do a map partition on the underlying
>>>>>> RDD and then add the column ?
>>>>>>
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <[email protected]
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Joseph,
>>>>>>>
>>>>>>> Thank your for the tips. I understand what should I do when my data
>>>>>>> are represented as a RDD. The thing that I can't figure out is how to do
>>>>>>> the same thing when the data is view as a DataFrame and I need to add
>>>>>>> the
>>>>>>> result of my pretrained model as a new column in the DataFrame.
>>>>>>> Preciselly,
>>>>>>> I want to implement the following transformer :
>>>>>>>
>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Jao,
>>>>>>>>
>>>>>>>> You can use external tools and libraries if they can be called from
>>>>>>>> your Spark program or script (with appropriate conversion of data
>>>>>>>> types,
>>>>>>>> etc.). The best way to apply a pre-trained model to a dataset would
>>>>>>>> be to
>>>>>>>> call the model from within a closure, e.g.:
>>>>>>>>
>>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>>>>>>>
>>>>>>>> If your data is distributed in an RDD (myRDD), then the above call
>>>>>>>> will distribute the computation of prediction using the pre-trained
>>>>>>>> model.
>>>>>>>> It will require that all of your Spark workers be able to run the
>>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on all
>>>>>>>> nodes in the compute cluster.
>>>>>>>>
>>>>>>>> For the second question, I would modify the above call as follows:
>>>>>>>>
>>>>>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>>>>> val myModel = // instantiate neural network on this partition
>>>>>>>> myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>>>>>>>> }
>>>>>>>>
>>>>>>>> I hope this helps!
>>>>>>>> Joseph
>>>>>>>>
>>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Dear all,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We mainly do large scale computer vision task (image
>>>>>>>>> classification, retrieval, ...). The pipeline is really great stuff
>>>>>>>>> for
>>>>>>>>> that. We're trying to reproduce the tutorial given on that topic
>>>>>>>>> during the
>>>>>>>>> latest spark summit (
>>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>>>>>>>> )
>>>>>>>>> using the master version of spark pipeline and dataframe. The tutorial
>>>>>>>>> shows different examples of feature extraction stages before running
>>>>>>>>> machine learning algorithms. Even the tutorial is straightforward to
>>>>>>>>> reproduce with this new API, we still have some questions :
>>>>>>>>>
>>>>>>>>> - Can one use external tools (e.g via pipe) as a pipeline
>>>>>>>>> stage ? An example of use case is to extract feature learned with
>>>>>>>>> convolutional neural network. In our case, this corresponds to a
>>>>>>>>> pre-trained neural network with Caffe library (
>>>>>>>>> http://caffe.berkeleyvision.org/) .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> - The second question is about the performance of the
>>>>>>>>> pipeline. Library such as Caffe processes the data in batch and
>>>>>>>>> instancing
>>>>>>>>> one Caffe network can be time consuming when this network is very
>>>>>>>>> deep. So,
>>>>>>>>> we can gain performance if we minimize the number of Caffe network
>>>>>>>>> creation
>>>>>>>>> and give data in batch to the network. In the pipeline, this
>>>>>>>>> corresponds to
>>>>>>>>> run transformers that work on a partition basis and give the whole
>>>>>>>>> partition to a single caffe network. How can we create such a
>>>>>>>>> transformer ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Jao
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>