Here is my current implementation with current master version of spark
*class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* * override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)=> {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))* * }* *}* where CaffeModel is a java api to Caffe C++ model. The problem here is that for every row it will create a new instance of CaffeModel which is inefficient since creating a new model means loading a large model file. And it will transform only a single row at a time. Or a Caffe network can process a batch of rows efficiently. In other words, is it possible to create an UDF that can operatats on a partition in order to minimize the creation of a CaffeModel and to take advantage of the Caffe network batch processing ? On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <jos...@databricks.com> wrote: > I see, thanks for clarifying! > > I'd recommend following existing implementations in spark.ml > transformers. You'll need to define a UDF which operates on a single Row > to compute the value for the new column. You can then use the DataFrame > DSL to create the new column; the DSL provides a nice syntax for what would > otherwise be a SQL statement like "select ... from ...". I'm recommending > looking at the existing implementation (rather than stating it here) > because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much > improved and makes it easier to create a new column. > > Joseph > > On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <jaon...@gmail.com> > wrote: > >> class DeepCNNFeature extends Transformer ... { >> >> override def transform(data: DataFrame, paramMap: ParamMap): >> DataFrame = { >> >> >> // How can I do a map partition on the underlying RDD >> and then add the column ? >> >> } >> } >> >> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <jaon...@gmail.com> >> wrote: >> >>> Hi Joseph, >>> >>> Thank your for the tips. I understand what should I do when my data are >>> represented as a RDD. The thing that I can't figure out is how to do the >>> same thing when the data is view as a DataFrame and I need to add the >>> result of my pretrained model as a new column in the DataFrame. Preciselly, >>> I want to implement the following transformer : >>> >>> class DeepCNNFeature extends Transformer ... { >>> >>> } >>> >>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <jos...@databricks.com> >>> wrote: >>> >>>> Hi Jao, >>>> >>>> You can use external tools and libraries if they can be called from >>>> your Spark program or script (with appropriate conversion of data types, >>>> etc.). The best way to apply a pre-trained model to a dataset would be to >>>> call the model from within a closure, e.g.: >>>> >>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) } >>>> >>>> If your data is distributed in an RDD (myRDD), then the above call will >>>> distribute the computation of prediction using the pre-trained model. It >>>> will require that all of your Spark workers be able to run the >>>> preTrainedModel; that may mean installing Caffe and dependencies on all >>>> nodes in the compute cluster. >>>> >>>> For the second question, I would modify the above call as follows: >>>> >>>> myRDD.mapPartitions { myDataOnPartition => >>>> val myModel = // instantiate neural network on this partition >>>> myDataOnPartition.map { myDatum => myModel.predict(myDatum) } >>>> } >>>> >>>> I hope this helps! >>>> Joseph >>>> >>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <jaon...@gmail.com> >>>> wrote: >>>> >>>>> Dear all, >>>>> >>>>> >>>>> We mainly do large scale computer vision task (image classification, >>>>> retrieval, ...). The pipeline is really great stuff for that. We're trying >>>>> to reproduce the tutorial given on that topic during the latest spark >>>>> summit ( >>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html >>>>> ) >>>>> using the master version of spark pipeline and dataframe. The tutorial >>>>> shows different examples of feature extraction stages before running >>>>> machine learning algorithms. Even the tutorial is straightforward to >>>>> reproduce with this new API, we still have some questions : >>>>> >>>>> - Can one use external tools (e.g via pipe) as a pipeline stage ? >>>>> An example of use case is to extract feature learned with convolutional >>>>> neural network. In our case, this corresponds to a pre-trained neural >>>>> network with Caffe library (http://caffe.berkeleyvision.org/) . >>>>> >>>>> >>>>> - The second question is about the performance of the pipeline. >>>>> Library such as Caffe processes the data in batch and instancing one >>>>> Caffe >>>>> network can be time consuming when this network is very deep. So, we >>>>> can >>>>> gain performance if we minimize the number of Caffe network creation >>>>> and >>>>> give data in batch to the network. In the pipeline, this corresponds >>>>> to run >>>>> transformers that work on a partition basis and give the whole >>>>> partition to >>>>> a single caffe network. How can we create such a transformer ? >>>>> >>>>> >>>>> >>>>> Best, >>>>> >>>>> Jao >>>>> >>>> >>>> >>> >> >