Following your suggestion, I end up with the following implementation :
*override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = { val schema = transformSchema(dataSet.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap* *val features = dataSet.select(map(inputCol)).mapPartitions { rows => Caffe.set_mode(Caffe.CPU) val net = CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight)) val inputBlobs: FloatBlobVector = net.input_blobs() val N: Int = 1 val K: Int = inputBlobs.get(0).channels() val H: Int = inputBlobs.get(0).height() val W: Int = inputBlobs.get(0).width() inputBlobs.get(0).Reshape(N, K, H, W) val dataBlob = new FloatPointer(N*K*W*H)* val inputCPUData = inputBlobs.get(0).mutable_cpu_data() val feat = rows.map { case Row(a: Iterable[Float])=> dataBlob.put(a.toArray, 0, a.size) caffe_copy_float(N*K*W*H, dataBlob, inputCPUData) val resultBlobs: FloatBlobVector = net.ForwardPrefilled() * val resultDim = resultBlobs.get(0).channels() logInfo(s"Output dimension $resultDim") val resultBlobData = resultBlobs.get(0).cpu_data() val output = new Array[Float](resultDim) resultBlobData.get(output) Vectors.dense(output.map(_.toDouble)) } //net.deallocate() feat } val newRowData = dataSet.rdd.zip(features).map { case (old, feat)=> val oldSeq = old.toSeq Row.fromSeq(oldSeq :+ feat) } dataSet.sqlContext.createDataFrame(newRowData, schema)}* The idea is to mapPartitions of the underlying RDD of the DataFrame and create a new DataFrame by zipping the results. It seems to work but when I try to save the RDD I got the following error : org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman < shiva...@eecs.berkeley.edu> wrote: > One workaround could be to convert a DataFrame into a RDD inside the > transform function and then use mapPartitions/broadcast to work with the > JNI calls and then convert back to RDD. > > Thanks > Shivaram > > On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <jaon...@gmail.com> > wrote: > >> Dear all, >> >> I'm still struggling to make a pre-trained caffe model transformer for >> dataframe works. The main problem is that creating a caffe model inside the >> UDF is very slow and consumes memories. >> >> Some of you suggest to broadcast the model. The problem with broadcasting >> is that I use a JNI interface to caffe C++ with javacpp-preset and it is >> not serializable. >> >> Another possible approach is to use a UDF that can handle a whole >> partitions instead of just a row in order to minimize the caffe model >> instantiation. >> >> Is there any ideas to solve one of these two issues ? >> >> >> >> Best, >> >> Jao >> >> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <jos...@databricks.com> >> wrote: >> >>> I see. I think your best bet is to create the cnnModel on the master >>> and then serialize it to send to the workers. If it's big (1M or so), then >>> you can broadcast it and use the broadcast variable in the UDF. There is >>> not a great way to do something equivalent to mapPartitions with UDFs right >>> now. >>> >>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>> wrote: >>> >>>> Here is my current implementation with current master version of spark >>>> >>>> >>>> >>>> >>>> *class DeepCNNFeature extends Transformer with HasInputCol with >>>> HasOutputCol ... { override def transformSchema(...) { ... }* >>>> * override def transform(dataSet: DataFrame, paramMap: ParamMap): >>>> DataFrame = {* >>>> >>>> * transformSchema(dataSet.schema, paramMap, logging = >>>> true)* >>>> >>>> >>>> >>>> * val map = this.paramMap ++ paramMap >>>> val deepCNNFeature = udf((v: Vector)=> {* >>>> >>>> * val cnnModel = new CaffeModel * >>>> >>>> * cnnModel.transform(v)* >>>> >>>> >>>> >>>> >>>> * } : Vector ) >>>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))* >>>> >>>> >>>> * }* >>>> *}* >>>> >>>> where CaffeModel is a java api to Caffe C++ model. >>>> >>>> The problem here is that for every row it will create a new instance of >>>> CaffeModel which is inefficient since creating a new model >>>> means loading a large model file. And it will transform only a single >>>> row at a time. Or a Caffe network can process a batch of rows efficiently. >>>> In other words, is it possible to create an UDF that can operatats on a >>>> partition in order to minimize the creation of a CaffeModel and >>>> to take advantage of the Caffe network batch processing ? >>>> >>>> >>>> >>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com> >>>> wrote: >>>> >>>>> I see, thanks for clarifying! >>>>> >>>>> I'd recommend following existing implementations in spark.ml >>>>> transformers. You'll need to define a UDF which operates on a single Row >>>>> to compute the value for the new column. You can then use the DataFrame >>>>> DSL to create the new column; the DSL provides a nice syntax for what >>>>> would >>>>> otherwise be a SQL statement like "select ... from ...". I'm recommending >>>>> looking at the existing implementation (rather than stating it here) >>>>> because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much >>>>> improved and makes it easier to create a new column. >>>>> >>>>> Joseph >>>>> >>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>>>> wrote: >>>>> >>>>>> class DeepCNNFeature extends Transformer ... { >>>>>> >>>>>> override def transform(data: DataFrame, paramMap: ParamMap): >>>>>> DataFrame = { >>>>>> >>>>>> >>>>>> // How can I do a map partition on the underlying >>>>>> RDD and then add the column ? >>>>>> >>>>>> } >>>>>> } >>>>>> >>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <jaon...@gmail.com >>>>>> > wrote: >>>>>> >>>>>>> Hi Joseph, >>>>>>> >>>>>>> Thank your for the tips. I understand what should I do when my data >>>>>>> are represented as a RDD. The thing that I can't figure out is how to do >>>>>>> the same thing when the data is view as a DataFrame and I need to add >>>>>>> the >>>>>>> result of my pretrained model as a new column in the DataFrame. >>>>>>> Preciselly, >>>>>>> I want to implement the following transformer : >>>>>>> >>>>>>> class DeepCNNFeature extends Transformer ... { >>>>>>> >>>>>>> } >>>>>>> >>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley < >>>>>>> jos...@databricks.com> wrote: >>>>>>> >>>>>>>> Hi Jao, >>>>>>>> >>>>>>>> You can use external tools and libraries if they can be called from >>>>>>>> your Spark program or script (with appropriate conversion of data >>>>>>>> types, >>>>>>>> etc.). The best way to apply a pre-trained model to a dataset would >>>>>>>> be to >>>>>>>> call the model from within a closure, e.g.: >>>>>>>> >>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) } >>>>>>>> >>>>>>>> If your data is distributed in an RDD (myRDD), then the above call >>>>>>>> will distribute the computation of prediction using the pre-trained >>>>>>>> model. >>>>>>>> It will require that all of your Spark workers be able to run the >>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on all >>>>>>>> nodes in the compute cluster. >>>>>>>> >>>>>>>> For the second question, I would modify the above call as follows: >>>>>>>> >>>>>>>> myRDD.mapPartitions { myDataOnPartition => >>>>>>>> val myModel = // instantiate neural network on this partition >>>>>>>> myDataOnPartition.map { myDatum => myModel.predict(myDatum) } >>>>>>>> } >>>>>>>> >>>>>>>> I hope this helps! >>>>>>>> Joseph >>>>>>>> >>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa < >>>>>>>> jaon...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Dear all, >>>>>>>>> >>>>>>>>> >>>>>>>>> We mainly do large scale computer vision task (image >>>>>>>>> classification, retrieval, ...). The pipeline is really great stuff >>>>>>>>> for >>>>>>>>> that. We're trying to reproduce the tutorial given on that topic >>>>>>>>> during the >>>>>>>>> latest spark summit ( >>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html >>>>>>>>> ) >>>>>>>>> using the master version of spark pipeline and dataframe. The tutorial >>>>>>>>> shows different examples of feature extraction stages before running >>>>>>>>> machine learning algorithms. Even the tutorial is straightforward to >>>>>>>>> reproduce with this new API, we still have some questions : >>>>>>>>> >>>>>>>>> - Can one use external tools (e.g via pipe) as a pipeline >>>>>>>>> stage ? An example of use case is to extract feature learned with >>>>>>>>> convolutional neural network. In our case, this corresponds to a >>>>>>>>> pre-trained neural network with Caffe library ( >>>>>>>>> http://caffe.berkeleyvision.org/) . >>>>>>>>> >>>>>>>>> >>>>>>>>> - The second question is about the performance of the >>>>>>>>> pipeline. Library such as Caffe processes the data in batch and >>>>>>>>> instancing >>>>>>>>> one Caffe network can be time consuming when this network is very >>>>>>>>> deep. So, >>>>>>>>> we can gain performance if we minimize the number of Caffe network >>>>>>>>> creation >>>>>>>>> and give data in batch to the network. In the pipeline, this >>>>>>>>> corresponds to >>>>>>>>> run transformers that work on a partition basis and give the whole >>>>>>>>> partition to a single caffe network. How can we create such a >>>>>>>>> transformer ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> Jao >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >