Could you join() the DFs on a common key?

On Fri, Dec 28, 2018 at 18:35 <em...@yeikel.com> wrote:

> Shabad , I am not sure what you are trying to say. Could you please give
> me an example? The result of the Query is a Dataframe that is created after
> iterating, so I am not sure how could I map that to a column without
> iterating and getting the values.
>
>
>
> I have a Dataframe that contains a list of cities for which I would like
> to iterate over and search in Elasticsearch.  This list is stored in
> Dataframe because it contains hundreds of thousands of elements with
> multiple properties that would not fit in a single machine.
>
>
>
> The issue is that the elastic-spark connector returns a Dataframe as well
> which leads to a dataframe creation within a Dataframe
>
>
>
> The only solution I found is to store the list of cities in a a regular
> scala Seq and iterate over that, but as far as I know this would make Seq
> centralized instead of distributed (run at the executor only?)
>
>
>
> Full example :
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *val cities    = Seq("New York","Michigan")cities.foreach(r => {  val qb =
> QueryBuilders.matchQuery("name", r).operator(Operator.AND)
> print(qb.toString)  val dfs = sqlContext.esDF("cities/docs", qb.toString)
> // Returns a dataframe for each city  dfs.show() // Works as expected. It
> prints the individual dataframe with the result of the query})*
>
>
>
>
>
> *val cities = Seq("New York","Michigan").toDF()*
>
>
>
> *    cities.foreach(r => {*
>
>
>
> *      val city  = r.getString(0)*
>
>
>
> *      val qb = QueryBuilders.matchQuery("name",
> city).operator(Operator.AND)*
>
> *      print(qb.toString)*
>
>
>
> *      val dfs = sqlContext.esDF("cities/docs", qb.toString) // null
> pointer*
>
>
>
> *      dfs.show()*
>
>
>
> *    })*
>
>
>
>
>
> *From:* Shahab Yunus <shahab.yu...@gmail.com>
> *Sent:* Friday, December 28, 2018 12:34 PM
> *To:* em...@yeikel.com
> *Cc:* user <user@spark.apache.org>
> *Subject:* Re: What are the alternatives to nested DataFrames?
>
>
>
> Can you have a dataframe with a column which stores json (type string)? Or
> you can also have a column of array type in which you store all cities
> matching your query.
>
>
>
>
>
>
>
> On Fri, Dec 28, 2018 at 2:48 AM <em...@yeikel.com> wrote:
>
> Hi community ,
>
>
>
> As shown in other answers online , Spark does not support the nesting of
> DataFrames , but what are the options?
>
>
>
> I have the following scenario :
>
>
>
> dataFrame1 = List of Cities
>
>
>
> dataFrame2 = Created after searching in ElasticSearch for each city in
> dataFrame1
>
>
>
> I've tried :
>
>
>
>  val cities    = sc.parallelize(Seq("New York")).toDF()
>
>    cities.foreach(r => {
>
>     val companyName = r.getString(0)
>
>     println(companyName)
>
>     val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)
>  //returns a DataFrame consisting of all the cities matching the entry in
> cities
>
>     })
>
>
>
> Which triggers the expected null pointer exception
>
>
>
> java.lang.NullPointerException
>
>     at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)
>
>     at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)
>
>     at
> org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)
>
>     at Main$$anonfun$main$1.apply(Main.scala:43)
>
>     at Main$$anonfun$main$1.apply(Main.scala:39)
>
>     at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>
>     at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
>
>     at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
>
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>
>     at org.apache.spark.scheduler.Task.run(Task.scala:109)
>
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>     at java.lang.Thread.run(Thread.java:748)
>
> 2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1
> times; aborting job
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent
> failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver):
> java.lang.NullPointerException
>
>
>
> What options do I have?
>
>
>
> Thank you.
>
> --
It's dark in this basement.

Reply via email to