Have you tried to cache? maybe after the collect() and before the map?

> On Sep 19, 2017, at 7:20 AM, Daniel O' Shaughnessy 
> <danieljamesda...@gmail.com> wrote:
> 
> Thanks for your response Jean. 
> 
> I managed to figure this out in the end but it's an extremely slow solution 
> and not tenable for my use-case:
> 
>       val rddX = 
> dfWithSchema.select("event_name").rdd.map(_.getString(0).split(",").map(_.trim
>  replaceAll ("[\\[\\]\"]", "")).toList)
>       //val oneRow = 
> Converted(eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq("CCB")).toDF("event_name")).select("eventIndex").first().getDouble(0))
>       rddX.take(5).foreach(println)
>       val severalRows = rddX.collect().map(row =>
>         if (row.length == 1) {
>           
> (eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(row(0).toString)).toDF("event_name")).select("eventIndex").first().getDouble(0))
>         } else {
>           row.map(tool => {
>             
> (eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(tool.toString)).toDF("event_name")).select("eventIndex").first().getDouble(0))
>           })
>       })
> 
> Wondering if there is any better/faster way to do this ?
> 
> Thanks.
> 
> 
> 
> On Fri, 15 Sep 2017 at 13:31 Jean Georges Perrin <j...@jgp.net 
> <mailto:j...@jgp.net>> wrote:
> Hey Daniel, not sure this will help, but... I had a similar need where i 
> wanted the content of a dataframe to become a "cell" or a row in the parent 
> dataframe. I grouped by the child dataframe, then collect it as a list in the 
> parent dataframe after a join operation. As I said, not sure it matches your 
> use case, but HIH...
> jg
> 
>> On Sep 15, 2017, at 5:42 AM, Daniel O' Shaughnessy 
>> <danieljamesda...@gmail.com <mailto:danieljamesda...@gmail.com>> wrote:
>> 
>> Hi guys,
>> 
>> I'm having trouble implementing this scenario:
>> 
>> I have a column with a typical entry being : ['apple', 'orange', 'apple', 
>> 'pear', 'pear']
>> 
>> I need to use a StringIndexer to transform this to : [0, 2, 0, 1, 1]
>> 
>> I'm attempting to do this but because of the nested operation on another RDD 
>> I get the NPE.
>> 
>> Here's my code so far, thanks:
>> 
>> val dfWithSchema = 
>> sqlContext.createDataFrame(eventFeaturesRDD).toDF("email", "event_name")
>> 
>>       // attempting
>>       import sqlContext.implicits._
>>       val event_list = dfWithSchema.select("event_name").distinct
>>       val event_listDF = event_list.toDF()
>>       val eventIndexer = new StringIndexer()
>>         .setInputCol("event_name")
>>         .setOutputCol("eventIndex")
>>         .fit(event_listDF)
>> 
>>       val eventIndexed = eventIndexer.transform(event_listDF)
>> 
>>       val converter = new IndexToString()
>>         .setInputCol("eventIndex")
>>         .setOutputCol("originalCategory")
>> 
>>       val convertedEvents = converter.transform(eventIndexed)
>>       val rddX = 
>> dfWithSchema.select("event_name").rdd.map(_.getString(0).split(",").map(_.trim
>>  replaceAll ("[\\[\\]\"]", "")).toList)
>>       //val oneRow = 
>> Converted(eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq("CCB")).toDF("event_name")).select("eventIndex").first().getDouble(0))
>> 
>>       val severalRows = rddX.map(row => {
>>         // Split array into n tools
>>         println("ROW: " + row(0).toString)
>>         println(row(0).getClass)
>>         println("PRINT: " + 
>> eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(row(0))).toDF("event_name")).select("eventIndex").first().getDouble(0))
>>         
>> (eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(row)).toDF("event_name")).select("eventIndex").first().getDouble(0),
>>  Seq(row).toString)
>>       })
>>       // attempting
> 

Reply via email to