In my transformSchema I do specify that the output column type is a VectorUDT :
*override def transformSchema(schema: StructType, paramMap: ParamMap):
StructType = { val map = this.paramMap ++ paramMap
checkInputColumn(schema, map(inputCol), ArrayType(FloatType, false))
addOutputColumn(schema, map(outputCol), new VectorUDT)}*
The output of printSchema is as follow :
*|-- cnnFeature: vecto (nullable = false)*
On Tue, Mar 31, 2015 at 9:55 AM, Shivaram Venkataraman <
[email protected]> wrote:
> My guess is that the `createDataFrame` call is failing here. Can you
> check if the schema being passed to it includes the column name and type
> for the newly being zipped `features` ?
>
> Joseph probably knows this better, but AFAIK the DenseVector here will
> need to be marked as a VectorUDT while creating a DataFrame column
>
> Thanks
> Shivaram
>
> On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa <[email protected]>
> wrote:
>
>> Following your suggestion, I end up with the following implementation :
>>
>>
>>
>>
>>
>>
>>
>> *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame =
>> { val schema = transformSchema(dataSet.schema, paramMap, logging = true)
>> val map = this.paramMap ++ paramMap*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *val features = dataSet.select(map(inputCol)).mapPartitions { rows =>
>> Caffe.set_mode(Caffe.CPU) val net =
>> CaffeUtils.floatTestNetwork(SparkFiles.get(topology),
>> SparkFiles.get(weight)) val inputBlobs: FloatBlobVector =
>> net.input_blobs() val N: Int = 1 val K: Int =
>> inputBlobs.get(0).channels() val H: Int = inputBlobs.get(0).height()
>> val W: Int = inputBlobs.get(0).width() inputBlobs.get(0).Reshape(N, K, H,
>> W) val dataBlob = new FloatPointer(N*K*W*H)*
>> val inputCPUData = inputBlobs.get(0).mutable_cpu_data()
>>
>> val feat = rows.map { case Row(a: Iterable[Float])=>
>> dataBlob.put(a.toArray, 0, a.size)
>> caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
>> val resultBlobs: FloatBlobVector = net.ForwardPrefilled()
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> * val resultDim = resultBlobs.get(0).channels() logInfo(s"Output
>> dimension $resultDim") val resultBlobData =
>> resultBlobs.get(0).cpu_data() val output = new Array[Float](resultDim)
>> resultBlobData.get(output) Vectors.dense(output.map(_.toDouble))
>> } //net.deallocate() feat } val newRowData =
>> dataSet.rdd.zip(features).map { case (old, feat)=> val oldSeq = old.toSeq
>> Row.fromSeq(oldSeq :+ feat) }
>> dataSet.sqlContext.createDataFrame(newRowData, schema)}*
>>
>>
>> The idea is to mapPartitions of the underlying RDD of the DataFrame and
>> create a new DataFrame by zipping the results. It seems to work but when I
>> try to save the RDD I got the following error :
>>
>> org.apache.spark.mllib.linalg.DenseVector cannot be cast to
>> org.apache.spark.sql.Row
>>
>>
>> On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman <
>> [email protected]> wrote:
>>
>>> One workaround could be to convert a DataFrame into a RDD inside the
>>> transform function and then use mapPartitions/broadcast to work with the
>>> JNI calls and then convert back to RDD.
>>>
>>> Thanks
>>> Shivaram
>>>
>>> On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa <[email protected]>
>>> wrote:
>>>
>>>> Dear all,
>>>>
>>>> I'm still struggling to make a pre-trained caffe model transformer for
>>>> dataframe works. The main problem is that creating a caffe model inside the
>>>> UDF is very slow and consumes memories.
>>>>
>>>> Some of you suggest to broadcast the model. The problem with
>>>> broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
>>>> and it is not serializable.
>>>>
>>>> Another possible approach is to use a UDF that can handle a whole
>>>> partitions instead of just a row in order to minimize the caffe model
>>>> instantiation.
>>>>
>>>> Is there any ideas to solve one of these two issues ?
>>>>
>>>>
>>>>
>>>> Best,
>>>>
>>>> Jao
>>>>
>>>> On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley <[email protected]>
>>>> wrote:
>>>>
>>>>> I see. I think your best bet is to create the cnnModel on the master
>>>>> and then serialize it to send to the workers. If it's big (1M or so),
>>>>> then
>>>>> you can broadcast it and use the broadcast variable in the UDF. There is
>>>>> not a great way to do something equivalent to mapPartitions with UDFs
>>>>> right
>>>>> now.
>>>>>
>>>>> On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Here is my current implementation with current master version of
>>>>>> spark
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *class DeepCNNFeature extends Transformer with HasInputCol with
>>>>>> HasOutputCol ... { override def transformSchema(...) { ... }*
>>>>>> * override def transform(dataSet: DataFrame, paramMap: ParamMap):
>>>>>> DataFrame = {*
>>>>>>
>>>>>> * transformSchema(dataSet.schema, paramMap, logging =
>>>>>> true)*
>>>>>>
>>>>>>
>>>>>>
>>>>>> * val map = this.paramMap ++ paramMap
>>>>>> val deepCNNFeature = udf((v: Vector)=> {*
>>>>>>
>>>>>> * val cnnModel = new CaffeModel *
>>>>>>
>>>>>> * cnnModel.transform(v)*
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> * } : Vector )
>>>>>> dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol))))*
>>>>>>
>>>>>>
>>>>>> * }*
>>>>>> *}*
>>>>>>
>>>>>> where CaffeModel is a java api to Caffe C++ model.
>>>>>>
>>>>>> The problem here is that for every row it will create a new instance
>>>>>> of CaffeModel which is inefficient since creating a new model
>>>>>> means loading a large model file. And it will transform only a single
>>>>>> row at a time. Or a Caffe network can process a batch of rows
>>>>>> efficiently.
>>>>>> In other words, is it possible to create an UDF that can operatats on a
>>>>>> partition in order to minimize the creation of a CaffeModel and
>>>>>> to take advantage of the Caffe network batch processing ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley <[email protected]
>>>>>> > wrote:
>>>>>>
>>>>>>> I see, thanks for clarifying!
>>>>>>>
>>>>>>> I'd recommend following existing implementations in spark.ml
>>>>>>> transformers. You'll need to define a UDF which operates on a single
>>>>>>> Row
>>>>>>> to compute the value for the new column. You can then use the DataFrame
>>>>>>> DSL to create the new column; the DSL provides a nice syntax for what
>>>>>>> would
>>>>>>> otherwise be a SQL statement like "select ... from ...". I'm
>>>>>>> recommending
>>>>>>> looking at the existing implementation (rather than stating it here)
>>>>>>> because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much
>>>>>>> improved and makes it easier to create a new column.
>>>>>>>
>>>>>>> Joseph
>>>>>>>
>>>>>>> On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa <[email protected]
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>>
>>>>>>>> override def transform(data: DataFrame, paramMap: ParamMap):
>>>>>>>> DataFrame = {
>>>>>>>>
>>>>>>>>
>>>>>>>> // How can I do a map partition on the underlying
>>>>>>>> RDD and then add the column ?
>>>>>>>>
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Joseph,
>>>>>>>>>
>>>>>>>>> Thank your for the tips. I understand what should I do when my
>>>>>>>>> data are represented as a RDD. The thing that I can't figure out is
>>>>>>>>> how to
>>>>>>>>> do the same thing when the data is view as a DataFrame and I need to
>>>>>>>>> add
>>>>>>>>> the result of my pretrained model as a new column in the DataFrame.
>>>>>>>>> Preciselly, I want to implement the following transformer :
>>>>>>>>>
>>>>>>>>> class DeepCNNFeature extends Transformer ... {
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jao,
>>>>>>>>>>
>>>>>>>>>> You can use external tools and libraries if they can be called
>>>>>>>>>> from your Spark program or script (with appropriate conversion of
>>>>>>>>>> data
>>>>>>>>>> types, etc.). The best way to apply a pre-trained model to a
>>>>>>>>>> dataset would
>>>>>>>>>> be to call the model from within a closure, e.g.:
>>>>>>>>>>
>>>>>>>>>> myRDD.map { myDatum => preTrainedModel.predict(myDatum) }
>>>>>>>>>>
>>>>>>>>>> If your data is distributed in an RDD (myRDD), then the above
>>>>>>>>>> call will distribute the computation of prediction using the
>>>>>>>>>> pre-trained
>>>>>>>>>> model. It will require that all of your Spark workers be able to
>>>>>>>>>> run the
>>>>>>>>>> preTrainedModel; that may mean installing Caffe and dependencies on
>>>>>>>>>> all
>>>>>>>>>> nodes in the compute cluster.
>>>>>>>>>>
>>>>>>>>>> For the second question, I would modify the above call as follows:
>>>>>>>>>>
>>>>>>>>>> myRDD.mapPartitions { myDataOnPartition =>
>>>>>>>>>> val myModel = // instantiate neural network on this partition
>>>>>>>>>> myDataOnPartition.map { myDatum => myModel.predict(myDatum) }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> I hope this helps!
>>>>>>>>>> Joseph
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Dear all,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We mainly do large scale computer vision task (image
>>>>>>>>>>> classification, retrieval, ...). The pipeline is really great stuff
>>>>>>>>>>> for
>>>>>>>>>>> that. We're trying to reproduce the tutorial given on that topic
>>>>>>>>>>> during the
>>>>>>>>>>> latest spark summit (
>>>>>>>>>>> http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
>>>>>>>>>>> )
>>>>>>>>>>> using the master version of spark pipeline and dataframe. The
>>>>>>>>>>> tutorial
>>>>>>>>>>> shows different examples of feature extraction stages before running
>>>>>>>>>>> machine learning algorithms. Even the tutorial is straightforward to
>>>>>>>>>>> reproduce with this new API, we still have some questions :
>>>>>>>>>>>
>>>>>>>>>>> - Can one use external tools (e.g via pipe) as a pipeline
>>>>>>>>>>> stage ? An example of use case is to extract feature learned with
>>>>>>>>>>> convolutional neural network. In our case, this corresponds to a
>>>>>>>>>>> pre-trained neural network with Caffe library (
>>>>>>>>>>> http://caffe.berkeleyvision.org/) .
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - The second question is about the performance of the
>>>>>>>>>>> pipeline. Library such as Caffe processes the data in batch and
>>>>>>>>>>> instancing
>>>>>>>>>>> one Caffe network can be time consuming when this network is
>>>>>>>>>>> very deep. So,
>>>>>>>>>>> we can gain performance if we minimize the number of Caffe
>>>>>>>>>>> network creation
>>>>>>>>>>> and give data in batch to the network. In the pipeline, this
>>>>>>>>>>> corresponds to
>>>>>>>>>>> run transformers that work on a partition basis and give the
>>>>>>>>>>> whole
>>>>>>>>>>> partition to a single caffe network. How can we create such a
>>>>>>>>>>> transformer ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Jao
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>