Hi guha, Thanks a lot! This is perfectly what I want and I'll try to implement it.
MoTao ________________________________ 发件人: ayan guha <guha.a...@gmail.com> 发送时间: 2016年8月8日 18:05:37 收件人: 莫涛 抄送: ndj...@gmail.com; user@spark.apache.org 主题: Re: 答复: how to generate a column using mapParition and then add it back to the df? Hi I think you should modify initModel() function to getOrCreateModel() and create the model as singleton object. You may want to refer this link<http://stackoverflow.com/questions/30450763/spark-streaming-and-connection-pool-implementation> On Mon, Aug 8, 2016 at 7:44 PM, 莫涛 <mo...@sensetime.com<mailto:mo...@sensetime.com>> wrote: Hi Ndjido, Thanks for your reply. Yes, it is good idea if the model can be broadcast. I'm working with a built library (on Linux, say classifier.so and classifier.h) and it requires the model file is in the local file system. As I don't have access to the library code, I write JNI to wrap the classifier. The model file can be sent to each executor efficiently by addFile and getFile. But initModel() is still expensive as it actually loads a local file into C++ heap memory, which is not serializable. That's the reason I can not broadcast the model and I have to avoid load model as possible as I can. Best ________________________________ 发件人: ndj...@gmail.com<mailto:ndj...@gmail.com> <ndj...@gmail.com<mailto:ndj...@gmail.com>> 发送时间: 2016年8月8日 17:16:27 收件人: 莫涛 抄送: user@spark.apache.org<mailto:user@spark.apache.org> 主题: Re: how to generate a column using mapParition and then add it back to the df? Hi MoTao, What about broadcasting the model? Cheers, Ndjido. > On 08 Aug 2016, at 11:00, MoTao > <mo...@sensetime.com<mailto:mo...@sensetime.com>> wrote: > > Hi all, > > I'm trying to append a column to a df. > I understand that the new column must be created by > 1) using literals, > 2) transforming an existing column in df, > or 3) generated from udf over this df > > In my case, the column to be appended is created by processing each row, > like > > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value") > val func = udf { > v: Double => { > val model = initModel() > model.process(v) > } > } > val df2 = df.withColumn("valueWithBias", func(col("value"))) > > This works fine. However, for performance reason, I want to avoid > initModel() for each row. > So I come with mapParitions, like > > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value") > val df2 = df.mapPartitions(rows => { > val model = initModel() > rows.map(row => model.process(row.getAs[Double](0))) > }) > val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL > > But this is wrong as a column of df2 *CANNOT* be appended to df. > > The only solution I got is to force mapPartitions to return a whole row > instead of the new column, > ( Something like "row => Row.fromSeq(row.toSeq ++ > Array(model.process(...)))" ) > which requires a lot of copy as well. > > I wonder how to deal with this problem with as few overhead as possible? > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: > user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> > -- Best Regards, Ayan Guha