How big is the list of fruits in your example? Can you broadcast it? On Tue, 19 Sep 2017 at 9:21 pm, Daniel O' Shaughnessy < danieljamesda...@gmail.com> wrote:
> Thanks for your response Jean. > > I managed to figure this out in the end but it's an extremely slow > solution and not tenable for my use-case: > > val rddX = dfWithSchema.select("event_name").rdd.map(_.getString(0).split( > ",").map(_.trim replaceAll ("[\\[\\]\"]", "")).toList) > //val oneRow = > Converted(eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq("CCB")).toDF("event_name")).select("eventIndex").first().getDouble(0)) > rddX.take(5).foreach(println) > val severalRows = rddX.collect().map(row => > if (row.length == 1) { > (eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(row(0 > ).toString)).toDF("event_name")).select("eventIndex").first().getDouble(0 > )) > } else { > row.map(tool => { > (eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq > (tool.toString)).toDF("event_name")).select("eventIndex" > ).first().getDouble(0)) > }) > }) > > Wondering if there is any better/faster way to do this ? > > Thanks. > > > > On Fri, 15 Sep 2017 at 13:31 Jean Georges Perrin <j...@jgp.net> wrote: > >> Hey Daniel, not sure this will help, but... I had a similar need where i >> wanted the content of a dataframe to become a "cell" or a row in the parent >> dataframe. I grouped by the child dataframe, then collect it as a list in >> the parent dataframe after a join operation. As I said, not sure it matches >> your use case, but HIH... >> jg >> >> On Sep 15, 2017, at 5:42 AM, Daniel O' Shaughnessy < >> danieljamesda...@gmail.com> wrote: >> >> Hi guys, >> >> I'm having trouble implementing this scenario: >> >> I have a column with a typical entry being : ['apple', 'orange', 'apple', >> 'pear', 'pear'] >> >> I need to use a StringIndexer to transform this to : [0, 2, 0, 1, 1] >> >> I'm attempting to do this but because of the nested operation on another >> RDD I get the NPE. >> >> Here's my code so far, thanks: >> >> val dfWithSchema = sqlContext.createDataFrame(eventFeaturesRDD).toDF( >> "email", "event_name") >> >> // attempting >> import sqlContext.implicits._ >> val event_list = dfWithSchema.select("event_name").distinct >> val event_listDF = event_list.toDF() >> val eventIndexer = new StringIndexer() >> .setInputCol("event_name") >> .setOutputCol("eventIndex") >> .fit(event_listDF) >> >> val eventIndexed = eventIndexer.transform(event_listDF) >> >> val converter = new IndexToString() >> .setInputCol("eventIndex") >> .setOutputCol("originalCategory") >> >> val convertedEvents = converter.transform(eventIndexed) >> val rddX = dfWithSchema.select("event_name").rdd.map(_.getString(0 >> ).split(",").map(_.trim replaceAll ("[\\[\\]\"]", "")).toList) >> //val oneRow = >> Converted(eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq("CCB")).toDF("event_name")).select("eventIndex").first().getDouble(0)) >> >> val severalRows = rddX.map(row => { >> // Split array into n tools >> println("ROW: " + row(0).toString) >> println(row(0).getClass) >> println("PRINT: " + >> eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq(row(0 >> ))).toDF("event_name")).select("eventIndex").first().getDouble(0)) >> (eventIndexer.transform(sqlContext.sparkContext.parallelize(Seq >> (row)).toDF("event_name")).select("eventIndex").first().getDouble(0), Seq >> (row).toString) >> }) >> // attempting >> >> >> -- Best Regards, Ayan Guha