2 options I can think of: 1- Can you perform a union of dfs returned by elastic research queries. It would still be distributed but I don't know if you will run out of how many union operations you can perform at a time.
2- Can you used some other api method of elastic search other than which returns a dataframe? On Fri, Dec 28, 2018 at 10:30 PM <em...@yeikel.com> wrote: > I could , but only if I had it beforehand. I do not know what the > dataframe is until I pass the query parameter and receive the resultant > dataframe inside the iteration. > > > > The steps are : > > > > Original DF -> Iterate -> Pass every element to a function that takes the > element of the original DF and returns a new dataframe including all the > matching terms > > > > > > *From:* Andrew Melo <andrew.m...@gmail.com> > *Sent:* Friday, December 28, 2018 8:48 PM > *To:* em...@yeikel.com > *Cc:* Shahab Yunus <shahab.yu...@gmail.com>; user <user@spark.apache.org> > *Subject:* Re: What are the alternatives to nested DataFrames? > > > > Could you join() the DFs on a common key? > > > > On Fri, Dec 28, 2018 at 18:35 <em...@yeikel.com> wrote: > > Shabad , I am not sure what you are trying to say. Could you please give > me an example? The result of the Query is a Dataframe that is created after > iterating, so I am not sure how could I map that to a column without > iterating and getting the values. > > > > I have a Dataframe that contains a list of cities for which I would like > to iterate over and search in Elasticsearch. This list is stored in > Dataframe because it contains hundreds of thousands of elements with > multiple properties that would not fit in a single machine. > > > > The issue is that the elastic-spark connector returns a Dataframe as well > which leads to a dataframe creation within a Dataframe > > > > The only solution I found is to store the list of cities in a a regular > scala Seq and iterate over that, but as far as I know this would make Seq > centralized instead of distributed (run at the executor only?) > > > > Full example : > > > > > > > > > > > > > > > *val cities = Seq("New York","Michigan")cities.foreach(r => { val qb = > QueryBuilders.matchQuery("name", r).operator(Operator.AND) > print(qb.toString) val dfs = sqlContext.esDF("cities/docs", qb.toString) > // Returns a dataframe for each city dfs.show() // Works as expected. It > prints the individual dataframe with the result of the query})* > > > > > > *val cities = Seq("New York","Michigan").toDF()* > > > > * cities.foreach(r => {* > > > > * val city = r.getString(0)* > > > > * val qb = QueryBuilders.matchQuery("name", > city).operator(Operator.AND)* > > * print(qb.toString)* > > > > * val dfs = sqlContext.esDF("cities/docs", qb.toString) // null > pointer* > > > > * dfs.show()* > > > > * })* > > > > > > *From:* Shahab Yunus <shahab.yu...@gmail.com> > *Sent:* Friday, December 28, 2018 12:34 PM > *To:* em...@yeikel.com > *Cc:* user <user@spark.apache.org> > *Subject:* Re: What are the alternatives to nested DataFrames? > > > > Can you have a dataframe with a column which stores json (type string)? Or > you can also have a column of array type in which you store all cities > matching your query. > > > > > > > > On Fri, Dec 28, 2018 at 2:48 AM <em...@yeikel.com> wrote: > > Hi community , > > > > As shown in other answers online , Spark does not support the nesting of > DataFrames , but what are the options? > > > > I have the following scenario : > > > > dataFrame1 = List of Cities > > > > dataFrame2 = Created after searching in ElasticSearch for each city in > dataFrame1 > > > > I've tried : > > > > val cities = sc.parallelize(Seq("New York")).toDF() > > cities.foreach(r => { > > val companyName = r.getString(0) > > println(companyName) > > val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName) > //returns a DataFrame consisting of all the cities matching the entry in > cities > > }) > > > > Which triggers the expected null pointer exception > > > > java.lang.NullPointerException > > at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53) > > at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51) > > at > org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37) > > at Main$$anonfun$main$1.apply(Main.scala:43) > > at Main$$anonfun$main$1.apply(Main.scala:39) > > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) > > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > > at org.apache.spark.scheduler.Task.run(Task.scala:109) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > 2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 > times; aborting job > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent > failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): > java.lang.NullPointerException > > > > What options do I have? > > > > Thank you. > > -- > > It's dark in this basement. >