Have you tried to cache? maybe after the collect() and before the map?
> On Sep 19, 2017, at 7:20 AM, Daniel O' Shaughnessy
> <danieljamesda...@gmail.com> wrote:
>
> Thanks for your response Jean.
>
> I managed to figure this out in the end but it's an extremely slow solution
> and not tenable for my use-case:
>
> val rddX =
> dfWithSchema.select("event_name").rdd.map(_.getString(0).split(",").map(_.trim
> replaceAll ("[\\[\\]\"]", "")).toList)
> //val oneRow =
> Converted(eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq("CCB")).toDF("event_name")).select("eventIndex").first().getDouble(0))
> rddX.take(5).foreach(println)
> val severalRows = rddX.collect().map(row =>
> if (row.length == 1) {
>
> (eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(row(0).toString)).toDF("event_name")).select("eventIndex").first().getDouble(0))
> } else {
> row.map(tool => {
>
> (eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(tool.toString)).toDF("event_name")).select("eventIndex").first().getDouble(0))
> })
> })
>
> Wondering if there is any better/faster way to do this ?
>
> Thanks.
>
>
>
> On Fri, 15 Sep 2017 at 13:31 Jean Georges Perrin <j...@jgp.net
> <mailto:j...@jgp.net>> wrote:
> Hey Daniel, not sure this will help, but... I had a similar need where i
> wanted the content of a dataframe to become a "cell" or a row in the parent
> dataframe. I grouped by the child dataframe, then collect it as a list in the
> parent dataframe after a join operation. As I said, not sure it matches your
> use case, but HIH...
> jg
>
>> On Sep 15, 2017, at 5:42 AM, Daniel O' Shaughnessy
>> <danieljamesda...@gmail.com <mailto:danieljamesda...@gmail.com>> wrote:
>>
>> Hi guys,
>>
>> I'm having trouble implementing this scenario:
>>
>> I have a column with a typical entry being : ['apple', 'orange', 'apple',
>> 'pear', 'pear']
>>
>> I need to use a StringIndexer to transform this to : [0, 2, 0, 1, 1]
>>
>> I'm attempting to do this but because of the nested operation on another RDD
>> I get the NPE.
>>
>> Here's my code so far, thanks:
>>
>> val dfWithSchema =
>> sqlContext.createDataFrame(eventFeaturesRDD).toDF("email", "event_name")
>>
>> // attempting
>> import sqlContext.implicits._
>> val event_list = dfWithSchema.select("event_name").distinct
>> val event_listDF = event_list.toDF()
>> val eventIndexer = new StringIndexer()
>> .setInputCol("event_name")
>> .setOutputCol("eventIndex")
>> .fit(event_listDF)
>>
>> val eventIndexed = eventIndexer.transform(event_listDF)
>>
>> val converter = new IndexToString()
>> .setInputCol("eventIndex")
>> .setOutputCol("originalCategory")
>>
>> val convertedEvents = converter.transform(eventIndexed)
>> val rddX =
>> dfWithSchema.select("event_name").rdd.map(_.getString(0).split(",").map(_.trim
>> replaceAll ("[\\[\\]\"]", "")).toList)
>> //val oneRow =
>> Converted(eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq("CCB")).toDF("event_name")).select("eventIndex").first().getDouble(0))
>>
>> val severalRows = rddX.map(row => {
>> // Split array into n tools
>> println("ROW: " + row(0).toString)
>> println(row(0).getClass)
>> println("PRINT: " +
>> eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(row(0))).toDF("event_name")).select("eventIndex").first().getDouble(0))
>>
>> (eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(row)).toDF("event_name")).select("eventIndex").first().getDouble(0),
>> Seq(row).toString)
>> })
>> // attempting
>