[spark context / spark sql] unexpected disk IO activity after spark job finished but spark context has not

2019-03-22 Thread Chenghao
Hi, 

I have a SparkSQL workload and ran it as a batch job in two cases. In the
first case, I execute the workload, and stop the batch job after `.show()`
finished. In the second case, I executed the same workload, and called a
1-minute sleep `Thread.sleep(6)` before I stop its spark context and the
batch job. The time costs for the workload in two cases are similar but I
detected an unexpected DISKBUSY spike on the spark local file in the second
case by using a system monitor tool "nmon" as shown in my second figure.

Could anyone help explain the reason of the disk spike and how to avoid it?
Does spark context have some periodical async IO activities that lead to the
spikes?

System Background: 
1. Spark 2.3.1, Hadoop 2.9.1, Hive 2.3.4 for metadata storage. 
2. One master and two worker nodes. Each node has enough available resources
(32 cores, 750G memory and 8 8-T disks from disk1 to disk8).
3. The HDFS is deployed on disk8; the disk1 is used for the spark shuffle
writing local storage.
4. I use Yarn client mode for resource management.
5. There is no other big application running in the backend.

Current Analysis:
1. The spike is not caused by the disk itself and other background
processes. I tried disk2, disk3, disk4, and disk8 for yarn local storage for
testing whether the spike is related to the program and the answer is yes.
It shows the same spikes every time I executed case 2.
2. The spike is caused by Spark itself. I tried the standalone deploy mode
and the spike still exists.
3. It might be relevant to shuffle size. The total shuffle writing size of
the target batch job is close to 2GB. Different workloads with shuffle
writing size close to 1MB, 250MB, and 1GB are also tried. The DISKBUSY
becomes negligible for the batch job with shuffling write size 1MB and
becomes up to 80% for the batch job with the total shuffling write size
250MB.
4. The disk spike might be for disk swap. The size of the local storage file
is traced. When disk spike appears, disk writing is detected but the disk
size does not increase -- So it might be doing some disk swap? 

Case 1:

 
Case 2:

 

To be more clear for the figures, the /worker1 node local/ and /worker2 node
local/ stand for the disk1 in worker1 and worker2 resp.; the /worker1 node
dfs/ and /worker2 node dfs/ stand for the disk8 in worker1 and worker2
resp., where HDFS locates. The left y-axis is the diskbusy (from 0% to 100%)
detected by nmon and the right y-axis is the size of the directory for hdfs
in disk8 (which we can just ignore for this problem).

Here is the code for the workload.

import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
val db = s"bigbench_sf_100"

val spark = SparkSession
  .builder()
  .enableHiveSupport()
  .getOrCreate()
val sc = spark.sparkContext

spark.sql(s"use $db")

val t1 = System.currentTimeMillis()
spark.sql(
  s"""
 |SELECT w_state, i_item_id,
 |  SUM(
 |CASE WHEN (unix_timestamp(d_date,'-MM-dd') <
unix_timestamp('2001-03-16','-MM-dd'))
 |THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
 |ELSE 0.0 END
 |  ) AS sales_before,
 |  SUM(
 |CASE WHEN (unix_timestamp(d_date,'-MM-dd') >=
unix_timestamp('2001-03-16','-MM-dd'))
 |THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
 |ELSE 0.0 END
 |  ) AS sales_after
 |FROM (
 |  SELECT *
 |  FROM web_sales ws
 |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number =
wr.wr_order_number
 |AND ws.ws_item_sk = wr.wr_item_sk)
 |) a1
 |JOIN item i ON a1.ws_item_sk = i.i_item_sk
 |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
 |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
 |AND unix_timestamp(d.d_date, '-MM-dd') >=
unix_timestamp('2001-03-16', '-MM-dd') - 30*24*60*60 --subtract 30 days
in seconds
 |AND unix_timestamp(d.d_date, '-MM-dd') <=
unix_timestamp('2001-03-16', '-MM-dd') + 30*24*60*60 --add 30 days in
seconds
 |GROUP BY w_state,i_item_id
 |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is
hives cluster scale counter part
 |ORDER BY w_state,i_item_id
 |LIMIT 100
   """.stripMargin).show
val t2 = System.currentTimeMillis()

//For case 2
//Thread.sleep(60 * 1000)

spark.stop()
  }
}



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to control batch size while reading from hdfs files?

2019-03-22 Thread kant kodali
Hi All,

What determines the batch size while reading from a file from HDFS?

I am trying to read files from HDFS and ingest into Kafka using Spark
Structured Streaming 2.3.1. I get an error sayiKafkafka batch size is too
big and that I need to increase max.request.size. Sure I can increase it
but I would like to know what other parameters I can change such that I
don't have to change the default max.request.size?

The default max.request.size of a Kafka producer docs says it is set to
1MB.
And each file I have in HDFS is < 12MB.

Any suggestions will be great.

Thanks!


Re: Cross Join

2019-03-22 Thread kathy Harayama
Hello,
I using 2.4 , it works

scala> val df_A=Seq(("1",
10.0),("2",20.0),("3",30.0),("4",40.0),("5",50.0),("6",60.0),("7",70.0),("8",80.0),("9",90.0),("10",10.0)).toDF("id","val");
df_A: org.apache.spark.sql.DataFrame = [id: string, val: double]

scala> val df_B=Seq(("11", 10.0),("12",20.0),("13",30.0)).toDF("id","val");
df_B: org.apache.spark.sql.DataFrame = [id: string, val: double]

scala> val df_C=df_A.crossJoin(df_B)
df_C: org.apache.spark.sql.DataFrame = [id: string, val: double ... 2 more
fields]

scala> df_C.show(30);
+---++---++
| id| val| id| val|
+---++---++
|  1|10.0| 11|10.0|
|  1|10.0| 12|20.0|
|  1|10.0| 13|30.0|
|  2|20.0| 11|10.0|
|  2|20.0| 12|20.0|
|  2|20.0| 13|30.0|
|  3|30.0| 11|10.0|
|  3|30.0| 12|20.0|
|  3|30.0| 13|30.0|
|  4|40.0| 11|10.0|
|  4|40.0| 12|20.0|
|  4|40.0| 13|30.0|
|  5|50.0| 11|10.0|
|  5|50.0| 12|20.0|
|  5|50.0| 13|30.0|
|  6|60.0| 11|10.0|
|  6|60.0| 12|20.0|
|  6|60.0| 13|30.0|
|  7|70.0| 11|10.0|
|  7|70.0| 12|20.0|
|  7|70.0| 13|30.0|
|  8|80.0| 11|10.0|
|  8|80.0| 12|20.0|
|  8|80.0| 13|30.0|
|  9|90.0| 11|10.0|
|  9|90.0| 12|20.0|
|  9|90.0| 13|30.0|
| 10|10.0| 11|10.0|
| 10|10.0| 12|20.0|
| 10|10.0| 13|30.0|
+---++---++

Kathleen

On Thu, Mar 21, 2019 at 10:47 AM asma zgolli  wrote:

>
>
> -- Forwarded message -
> From: asma zgolli 
> Date: jeu. 21 mars 2019 à 18:15
> Subject: Cross Join
> To: 
>
>
> Hello ,
>
> I need to cross my data and i'm executing a cross join on two dataframes .
>
> C = A.crossJoin(B)
> A has 50 records
> B has 5 records
>
> the result im getting with spark 2.0 is a dataframe C having 50 records.
>
> only the first row from B was added to C.
>
> Is that a bug in Spark?
>
> Asma ZGOLLI
>
> PhD student in data engineering - computer science
>
>
>
> --
> Asma ZGOLLI
>
> PhD student in data engineering - computer science
> Email : zgollia...@gmail.com
> email alt:  asma.zgo...@univ-grenoble-alpes.fr 
> Tel : (+33) 07 52 95 04 45
> (+216) 50 126 797
> Skype : asma_zgolli
>


Re: writing a small csv to HDFS is super slow

2019-03-22 Thread kathy Harayama
Hi Lian,
Since you using repartition(1), do you want to decrease the number of
partitions? If so, have you tried to use coalesce instead?

Kathleen

On Fri, Mar 22, 2019 at 2:43 PM Lian Jiang  wrote:

> Hi,
>
> Writing a csv to HDFS takes about 1 hour:
>
>
> df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv)
>
> The generated csv file is only about 150kb. The job uses 3 containers (13
> cores, 23g mem).
>
> Other people have similar issues but I don't see a good explanation and
> solution.
>
> Any clue is highly appreciated! Thanks.
>
>
>


Re: writing a small csv to HDFS is super slow

2019-03-22 Thread Apostolos N. Papadopoulos
Is it also slow when you do not repartition? (i.e., to get multiple 
output files)


Also did you try simply saveAsTextFile?

Also, before repartition, how many partitions are there?

a.


On 22/3/19 23:34, Lian Jiang wrote:

Hi,

Writing a csv to HDFS takes about 1 hour:

df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv)

The generated csv file is only about 150kb. The job uses 3 containers 
(13 cores, 23g mem).


Other people have similar issues but I don't see a good explanation 
and solution.


Any clue is highly appreciated! Thanks.



--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



writing a small csv to HDFS is super slow

2019-03-22 Thread Lian Jiang
Hi,

Writing a csv to HDFS takes about 1 hour:

df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv)

The generated csv file is only about 150kb. The job uses 3 containers (13
cores, 23g mem).

Other people have similar issues but I don't see a good explanation and
solution.

Any clue is highly appreciated! Thanks.


Re: Java Heap Space error - Spark ML

2019-03-22 Thread Apostolos N. Papadopoulos
What is the size of your data, size of the cluster, are you using 
spark-submit or an IDE, what spark version are you using?


Try spark-submit and increase the memory of the driver or the executors.

a.


On 22/3/19 17:19, KhajaAsmath Mohammed wrote:

Hi,

I am getting the below exception when using Spark Kmeans. Any 
solutions from the experts. Would be really helpful.


val kMeans = new KMeans().setK(reductionCount).setMaxIter(30)

    val kMeansModel = kMeans.fit(df)

Error is occured when calling kmeans.fit


Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
        at 
org.apache.spark.mllib.linalg.SparseVector.toArray(Vectors.scala:760)
        at 
org.apache.spark.mllib.clustering.VectorWithNorm.toDense(KMeans.scala:614)
        at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382)
        at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at 
org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:382)
        at 
org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256)

        at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
        at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319)
        at 
com.datamantra.spark.DataBalancing$.createBalancedDataframe(DataBalancing.scala:25)
        at 
com.datamantra.spark.jobs.IftaMLTraining$.trainML$1(IftaMLTraining.scala:182)
        at 
com.datamantra.spark.jobs.IftaMLTraining$.main(IftaMLTraining.scala:94)
        at 
com.datamantra.spark.jobs.IftaMLTraining.main(IftaMLTraining.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,
Asmath


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol



Re: Manually reading parquet files.

2019-03-22 Thread Wenchen Fan
Try `val enconder = RowEncoder(df.schema).resolveAndBind()` ?

On Thu, Mar 21, 2019 at 5:39 PM Long, Andrew 
wrote:

> Thanks a ton for the help!
>
>
>
> Is there a standardized way of converting the internal row to rows?
>
>
>
> I’ve tried this but im getting an exception
>
>
>
> *val *enconder = *RowEncoder*(df.schema)
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> })
>   .map(enconder.fromRow(_))
>   .toList
>
>
>
> java.lang.RuntimeException: Error while decoding:
> java.lang.UnsupportedOperationException: Cannot evaluate expression:
> getcolumnbyordinal(0, IntegerType)
>
> createexternalrow(getcolumnbyordinal(0, IntegerType),
> getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2,
> StringType).toString, StructField(pk,IntegerType,false),
> StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))
>
>
>
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *"rb...@netflix.com" 
> *Date: *Thursday, March 21, 2019 at 3:32 PM
> *To: *"Long, Andrew" 
> *Cc: *"d...@spark.apache.org" , "
> user@spark.apache.org" , "horizon-...@amazon.com" <
> horizon-...@amazon.com>
> *Subject: *Re: Manually reading parquet files.
>
>
>
> You're getting InternalRow instances. They probably have the data you
> want, but the toString representation doesn't match the data for
> InternalRow.
>
>
>
> On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew 
> wrote:
>
> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues.  This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
>   //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
>   *TestRow*(1,1,"asdf"),
>   *TestRow*(2,1,"asdf"),
>   *TestRow*(3,1,"asdf"),
>   *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
>   partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
>   filePath = path.toString,
>   start = 0,
>   length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
>   fileFormat.buildReaderWithPartitionValues(
> sparkSession = *spark*,
> dataSchema = dataSchema,
> partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
> requiredSchema = dataSchema,
> filters = *Seq*.empty,
> options = *Map*.*empty*,
> hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
>   )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>
>   // This doesn't work. vector mode is doing something screwy
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,24,66647361])
> //??this is wrong I think
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>


Re: spark sql occer error

2019-03-22 Thread Wenchen Fan
Did you include the whole error message?

On Fri, Mar 22, 2019 at 12:45 AM 563280193 <563280...@qq.com> wrote:

> Hi ,
> I ran a spark sql like this:
>
> *select imei,tag, product_id,*
> *   sum(case when succ1>=1 then 1 else 0 end) as succ,*
> *   sum(case when fail1>=1 and succ1=0 then 1 else 0 end) as fail,*
> *   count(*) as cnt*
> *from t_tbl*
> *where sum(case when succ1>=1 then 1 else 0 end)=0 and sum(case when
> fail1>=1 and succ1=0 then 1 else 0 end)>0*
> *group by **tag, product_id, app_version*
>
> It occur a problem below:
>
> * execute, tree:*
> *Exchange hashpartitioning(imei#0, tag#1, product_id#2, 100)*
> *+- *(1) HashAggregate(keys=[imei#0, tag#1, product_id#2],
> functions=[partial_sum(cast(CASE WHEN (succ1#3L >= 1) THEN 1 ELSE 0 END as
> bigint)), partial_sum(cast(CASE WHEN ((fail1#4L >= 1) && (succ1#3L = 0))
> THEN 1 ELSE 0 END as bigint)), partial_count(1)], output=[imei#0, tag#1,
> product_id#2, sum#49L, sum#50L, count#51L])*
> *   +- *(1) Filter ((sum(cast(CASE WHEN (succ1#3L >= 1) THEN 1 ELSE 0 END
> as bigint)) = 0) && (sum(cast(CASE WHEN ((fail1#4L >= 1) && (succ1#3L = 0))
> THEN 1 ELSE 0 END as bigint)) > 0))*
> *  +- *(1) FileScan json [imei#0,tag#1,product_id#2,succ1#3L,fail1#4L]
> Batched: false, Format: JSON, Location: InMemoryFileIndex[hdfs://xx],
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct*
>
>
> Could anyone help me to solve this problem?
> my spark version is 2.3.1
> thank you.
>


Java Heap Space error - Spark ML

2019-03-22 Thread KhajaAsmath Mohammed
Hi,

I am getting the below exception when using Spark Kmeans. Any solutions
from the experts. Would be really helpful.

val kMeans = new KMeans().setK(reductionCount).setMaxIter(30)

val kMeansModel = kMeans.fit(df)

Error is occured when calling kmeans.fit


Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.mllib.linalg.SparseVector.toArray(Vectors.scala:760)
at
org.apache.spark.mllib.clustering.VectorWithNorm.toDense(KMeans.scala:614)
at
org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382)
at
org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:382)
at
org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256)
at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319)
at
com.datamantra.spark.DataBalancing$.createBalancedDataframe(DataBalancing.scala:25)
at
com.datamantra.spark.jobs.IftaMLTraining$.trainML$1(IftaMLTraining.scala:182)
at
com.datamantra.spark.jobs.IftaMLTraining$.main(IftaMLTraining.scala:94)
at
com.datamantra.spark.jobs.IftaMLTraining.main(IftaMLTraining.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,
Asmath

>


Re: spark sql occer error

2019-03-22 Thread Mingcong Han
Hi,
It seems that there is a syntax error in your SQL. You should use `select
... from ... group by ... having sum(...)=0 and sum(...)>0` instead of
`where sum(...)=0...`. I think aggregate functions are not allowed in WHERE. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark sql occer error

2019-03-22 Thread 563280193
Hi , 
I ran a spark sql like this:


select imei,tag, product_id,
  sum(case when succ1>=1 then 1 else 0 end) as succ,
  sum(case when fail1>=1 and succ1=0 then 1 else 0 end) as fail,
  count(*) as cnt
from t_tbl
where sum(case when succ1>=1 then 1 else 0 end)=0 and sum(case when 
fail1>=1 and succ1=0 then 1 else 0 end)>0
group by tag, product_id, app_version



It occur a problem below:


 execute, tree:
Exchange hashpartitioning(imei#0, tag#1, product_id#2, 100)
+- *(1) HashAggregate(keys=[imei#0, tag#1, product_id#2], 
functions=[partial_sum(cast(CASE WHEN (succ1#3L >= 1) THEN 1 ELSE 0 END as 
bigint)), partial_sum(cast(CASE WHEN ((fail1#4L >= 1) && (succ1#3L = 0)) THEN 1 
ELSE 0 END as bigint)), partial_count(1)], output=[imei#0, tag#1, product_id#2, 
sum#49L, sum#50L, count#51L])
   +- *(1) Filter ((sum(cast(CASE WHEN (succ1#3L >= 1) THEN 1 ELSE 0 END as 
bigint)) = 0) && (sum(cast(CASE WHEN ((fail1#4L >= 1) && (succ1#3L = 0)) THEN 1 
ELSE 0 END as bigint)) > 0))
  +- *(1) FileScan json [imei#0,tag#1,product_id#2,succ1#3L,fail1#4L] 
Batched: false, Format: JSON, Location: InMemoryFileIndex[hdfs://xx], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct





Could anyone help me to solve this problem?
my spark version is 2.3.1
thank you.

spark sql occer error

2019-03-22 Thread 563280...@qq.com
Hi , 
I ran a spark sql like this:

select imei,tag, product_id,
  sum(case when succ1>=1 then 1 else 0 end) as succ,
  sum(case when fail1>=1 and succ1=0 then 1 else 0 end) as fail,
  count(*) as cnt
from t_tbl
where sum(case when succ1>=1 then 1 else 0 end)=0 and sum(case when 
fail1>=1 and succ1=0 then 1 else 0 end)>0
group by tag, product_id, app_version

It occur a problem below:

 execute, tree:
Exchange hashpartitioning(imei#0, tag#1, product_id#2, 100)
+- *(1) HashAggregate(keys=[imei#0, tag#1, product_id#2], 
functions=[partial_sum(cast(CASE WHEN (succ1#3L >= 1) THEN 1 ELSE 0 END as 
bigint)), partial_sum(cast(CASE WHEN ((fail1#4L >= 1) && (succ1#3L = 0)) THEN 1 
ELSE 0 END as bigint)), partial_count(1)], output=[imei#0, tag#1, product_id#2, 
sum#49L, sum#50L, count#51L])
   +- *(1) Filter ((sum(cast(CASE WHEN (succ1#3L >= 1) THEN 1 ELSE 0 END as 
bigint)) = 0) && (sum(cast(CASE WHEN ((fail1#4L >= 1) && (succ1#3L = 0)) THEN 1 
ELSE 0 END as bigint)) > 0))
  +- *(1) FileScan json [imei#0,tag#1,product_id#2,succ1#3L,fail1#4L] 
Batched: false, Format: JSON, Location: InMemoryFileIndex[hdfs://xx], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct


Could anyone help me to solve this problem?
my spark version is 2.3.1
thank you.



563280...@qq.com