Re: spark-sql force parallel union

2018-11-20 Thread kathleen li
you might first write the code to construct query statement with "union
all"  like below:

scala> val query="select * from dfv1 union all select * from dfv2 union all
select * from dfv3"
query: String = select * from dfv1 union all select * from dfv2 union all
select * from dfv3

then write loop to register each partition to a view like below:
 for (i <- 1 to 3){
  df.createOrReplaceTempView("dfv"+i)
  }

scala> spark.sql(query).explain
== Physical Plan ==
Union
:- LocalTableScan [_1#0, _2#1, _3#2]
:- LocalTableScan [_1#0, _2#1, _3#2]
+- LocalTableScan [_1#0, _2#1, _3#2]


You can use " roll up" or "group set" for multiple dimension  to replace
"union" or "union all"

On Tue, Nov 20, 2018 at 8:34 PM onmstester onmstester
 wrote:

> I'm using Spark-Sql to query Cassandra tables. In Cassandra, i've
> partitioned my data with time bucket and one id, so based on queries i need
> to union multiple partitions with spark-sql and do the
> aggregations/group-by on union-result, something like this:
>
> for(all cassandra partitions){
> DataSet currentPartition = sqlContext.sql();
> unionResult = unionResult.union(currentPartition);
> }
>
> Increasing input (number of loaded partitions), increases response time
> more than linearly because unions would be done sequentialy.
>
> Because there is no harm in doing unions in parallel, and i dont know how
> to force spark to do them in parallel, Right now i'm using a ThreadPool to
> Asyncronosly load all partitions in my application (which may cause OOM),
> and somehow do the sort or simple group by in java (Which make me think why
> even i'm using spark at all?)
>
> The short question is: How to force spark-sql to load cassandra partitions
> in parallel while doing union on them? Also I don't want too many tasks in
> spark, with my Home-Made Async solution, i use coalesece(1) so one task is
> so fast (only wait time on casandra).
>
> Sent using Zoho Mail 
>
>
>


Re: [Spark SQL] [Spark 2.4.0] v1 -> struct(v1.e) fails

2018-11-19 Thread kathleen li
How about this:

df.select(expr("transform( b, v1 -> struct(v1) )")).show()

+
|transform(b, lambdafunction(named_struct(v1, namedlambdavariable()),
namedlambdavariable()))|
++
|
   [[[1]]]|
++


On Thu, Nov 15, 2018 at 6:47 AM François Sarradin 
wrote:

> Hi,
>
> I've this JSON document :
>
> { "b": [ { "e": 1 } ] }
>
> When I do :
>
> df.select(expr("transform( b, v1 -> struct(v1.e) )"))
>
> I get this error :
>
> cannot resolve 'named_struct(NamePlaceholder(), namedlambdavariable().e)'
> due to data type mismatch: Only foldable string expressions are allowed to
> appear at odd position, got: NamePlaceholder; line 1 pos 20; 'Project
> [unresolvedalias(transform(b#5,
> lambdafunction(named_struct(NamePlaceholder, lambda v1#7.e), lambda v1#7,
> false)), Some())] +- LogicalRDD [b#5], false
>
> org.apache.spark.sql.AnalysisException: cannot resolve 
> 'named_struct(NamePlaceholder(), namedlambdavariable().`e`)' due to data type 
> mismatch: Only foldable string expressions are allowed to appear at odd 
> position, got: NamePlaceholder; line 1 pos 20;
> 'Project [unresolvedalias(transform(b#5, 
> lambdafunction(named_struct(NamePlaceholder, lambda v1#7.e), lambda v1#7, 
> false)), Some())]
> +- LogicalRDD [b#5], false
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:115)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:107)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
> ...
>
> By doing some investigations, it seems that this error is due to the fact 
> that v1.e is seen as a NamePlaceHolder and not as a Literal. This is somewhat 
> understandable, as v1 is not resolved here. But, isn't it possible that 
> struct(v1.e) uses v1.e as a field name?
>
> regards,
>
> françois-
>
>


Re: 答复: Executor hang

2018-10-07 Thread kathleen li


https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-whole-stage-codegen.html#spark.sql.codegen.wholeStage
Sent from my iPhone

> On Oct 7, 2018, at 3:21 PM, 阎志涛  wrote:
> 
> It is not data skewed issue. The executor running more than 8 hours and I 
> have to kill the executor. I doubt the problem is caused by Spark codegen but 
> it sound spark.sql.codegen.wholeStage=false can not work. I met another 
> problem with codegen which cause my spark job failed with following stack 
> trace:
> 18/10/08 01:58:36 WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID 1912, 
> data1.aidata360.com, executor 4): java.util.NoSuchElementException
>at java.util.LinkedList.removeFirst(LinkedList.java:270)
>at 
> org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator.hasNext(BytesToBytesMap.java:310)
>at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap$1.next(UnsafeFixedWidthAggregationMap.java:177)
>at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>at 
> org.apache.spark.sql.Dataset$$anonfun$56$$anon$1.hasNext(Dataset.scala:2712)
>at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
>at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:748)
>  
> How can I disable wholestage code gen?
>  
> Thanks and Regards,
> Tony
>  
> 发件人: kathleen li  
> 发送时间: 2018年10月8日 1:04
> 收件人: 阎志涛 
> 抄送: user@spark.apache.org
> 主题: Re: Executor hang
>  
> It seemed you had data skewed issue since shuffle read size for executor 4 is 
> almost 2 times than other executors and GC time 11s almost 15 to 20 times 
> than others.
>  
> Kathleen
> 
> Sent from my iPhone
> 
> On Oct 7, 2018, at 5:24 AM, 阎志涛  wrote:
> 
> Hi, All,
> I am running Spark 2.1 on Hadoop 2.7.2 with yarn. While executing spark 
> tasks, some executor keep running forever without success. From the following 
> screenshot:
> 
> We can see that executor 4 keep running for 26 minutes and the shuffle read 
> size/records keep unchanged for 26mins too.  Threaddump for the thread is as 
> following:
> 
>  
> 
>  
> The linux version is: Linux version 4.14.62-70.117.amzn2.x86_64 
> (mockbuild@ip-10-0-1-79) and jdk version is Oracle JDK 1.8.0_181. With jstack 
> on the machine, I can see following thread dump:
>  
> "Executor task launch worker for task 3806" #54 daemon prio=5 os_prio=0 
> tid=0x01230800 nid=0x1fc runnable [0x7fba0e60]
>java.lang.Thread.State: RUNNABLE
>at java.lang.StringCoding.encode(StringCoding.java:364)
>at java.lang.String.getBytes(String.java:941)
>at 
> org.apache.spark.unsafe.types.UTF8String.fromString(UTF8String.java:109)
>at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
>at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
>at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$File

Re: Executor hang

2018-10-07 Thread kathleen li
It seemed you had data skewed issue since shuffle read size for executor 4 is 
almost 2 times than other executors and GC time 11s almost 15 to 20 times than 
others.

Kathleen

Sent from my iPhone

> On Oct 7, 2018, at 5:24 AM, 阎志涛  wrote:
> 
> Hi, All,
> I am running Spark 2.1 on Hadoop 2.7.2 with yarn. While executing spark 
> tasks, some executor keep running forever without success. From the following 
> screenshot:
> 
> We can see that executor 4 keep running for 26 minutes and the shuffle read 
> size/records keep unchanged for 26mins too.  Threaddump for the thread is as 
> following:
> 
>  
> 
>  
> The linux version is: Linux version 4.14.62-70.117.amzn2.x86_64 
> (mockbuild@ip-10-0-1-79) and jdk version is Oracle JDK 1.8.0_181. With jstack 
> on the machine, I can see following thread dump:
>  
> "Executor task launch worker for task 3806" #54 daemon prio=5 os_prio=0 
> tid=0x01230800 nid=0x1fc runnable [0x7fba0e60]
>java.lang.Thread.State: RUNNABLE
>at java.lang.StringCoding.encode(StringCoding.java:364)
>at java.lang.String.getBytes(String.java:941)
>at 
> org.apache.spark.unsafe.types.UTF8String.fromString(UTF8String.java:109)
>at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
>at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
>at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
>at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
>at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
>at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
>at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
>at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:748)
>  
> I wonder why this happened? Is it related to my environment of a bug of Spark?
>  
> Thanks and Regards,
> Tony
>  
> 阎志涛
> 研发副总裁
>  
> M  + 86-139 1181 5695
> Wechat   zhitao_yan
>  
> 北京腾云天下科技有限公司
> 北京市东直门外大街39号院2号楼608室,100027
>  
> TalkingData.com
>  


Re: How to do a broadcast join using raw Spark SQL 2.3.1 or 2.3.2?

2018-10-03 Thread kathleen li
Not sure what you mean about “raw” Spark sql, but there is one parameter which 
will impact the optimizer choose broadcast join automatically or not :

spark.sql.autoBroadcastJoinThreshold

You can read Spark doc about above parameter setting and using explain to check 
your join using broadcast or not.

Make sure you gather statistics for tables.
 
There is broadcast hint also. Please be aware if the table being broadcasted to 
all worker nodes is fairly big, it will not be a good option always.

Kathleen

Sent from my iPhone

> On Oct 3, 2018, at 4:37 PM, kant kodali  wrote:
> 
> Hi All,
> 
> How to do a broadcast join using raw Spark SQL 2.3.1 or 2.3.2? 
> 
> Thanks
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Text from pdf spark

2018-09-28 Thread kathleen li
The error message is “file not found”
Are you able to use the following command line to assess the file with the user 
you submitted the job?
hdfs dfs -ls /tmp/sample.pdf

Sent from my iPhone

> On Sep 28, 2018, at 12:10 PM, Joel D  wrote:
> 
> I'm trying to extract text from pdf files in hdfs using pdfBox. 
> However it throws an error:
> 
> "Exception in thread "main" org.apache.spark.SparkException: ...
> java.io.FileNotFoundException: /nnAlias:8020/tmp/sample.pdf 
> (No such file or directory)"
> 
> 
> 
> What am I missing? Should I be working with PortableDataStream instead of the 
> string part of:
> val files: RDD[(String, PortableDataStream)]?
> def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession: 
> SparkSession) = {
> val file: File = new File(fileNameFromRDD._1.drop(5))
> val document = PDDocument.load(file); //It throws an error here.
> 
> if (!document.isEncrypted()) {
>   val stripper = new PDFTextStripper()
>   val text = stripper.getText(document)
>   println("Text:" + text)
> 
> }
> document.close()
> 
>   }
> 
> //This is where I call the above pdf to text converter method.
>  val files = 
> sparkSession.sparkContext.binaryFiles("hdfs://nnAlias:8020/tmp/sample.pdf")
> files.foreach(println)
> 
> files.foreach(f => println(f._1))
> 
> files.foreach(fileStream => pdfRead(fileStream, sparkSession))
> 
> Thanks.
> 
> 
> 
> 
> 
> 


Re: Given events with start and end times, how to count the number of simultaneous events using Spark?

2018-09-26 Thread kathleen li
You can use Spark sql window  function , something like
df.createOrReplaceTempView(“dfv”)
Select count(eventid) over ( partition by start_time, end_time orderly 
start_time) from  dfv

Sent from my iPhone

> On Sep 26, 2018, at 11:32 AM, Debajyoti Roy  wrote:
> 
> The problem statement and an approach to solve it using windows is described 
> here:
> 
> https://stackoverflow.com/questions/52509498/given-events-with-start-and-end-times-how-to-count-the-number-of-simultaneous-e
> 
> Looking for more elegant/performant solutions, if they exist. TIA !


Re: [SparkSQL] Count Distinct issue

2018-09-17 Thread kathleen li
Hi,
I can't reproduce your issue:

scala> spark.sql("select distinct * from dfv").show()
++++++++++++++++---+
|   a|   b|   c|   d|   e|   f|   g|   h|   i|   j|   k|   l|   m|   n|
o|  p|
++++++++++++++++---+
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
9|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
13|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
2|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
7|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
8|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
3|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
5|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
15|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
12|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
16|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
14|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
4|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
6|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
10|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
11|
|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
1|
++++++++++++++++---+


scala> spark.sql("select count(distinct *) from dfv").show()
+--+
|count(DISTINCT a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p)|
+--+
|16|
+--+
Kathleen

On Fri, Sep 14, 2018 at 11:54 AM Daniele Foroni 
wrote:

> Hi all,
>
> I am having some troubles in doing a count distinct over multiple columns.
> This is an example of my data:
> ++++---+
> |a   |b   |c   |d  |
> ++++---+
> |null|null|null|1  |
> |null|null|null|2  |
> |null|null|null|3  |
> |null|null|null|4  |
> |null|null|null|5  |
> |null|null|null|6  |
> |null|null|null|7  |
> ++++---+
> And my code:
> val df: Dataset[Row] = …
> val cols: List[Column] = df.columns.map(col).toList
> df.agg(countDistinct(cols.head, cols.tail: _*))
>
> So, in the example above, if I count the distinct “rows” I obtain 7 as
> result as expected (since the “d" column changes for every row).
> However, with more columns (16) in EXACTLY the same situation (one
> incremental column and 15 columns filled with nulls) the result is 0.
>
> I don’t understand why I am experiencing this problem.
> Any solution?
>
> Thanks,
> ---
> Daniele
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


What is the best way for Spark to read HDF5@scale?

2018-09-14 Thread kathleen li
Hi,
Any Spark-connector for HDF5?

The following link does not work anymore?

https://www.hdfgroup.org/downloads/spark-connector/
down vo

Thanks,

Kathleen