Re: How to tune the performance of Tpch query5 within Spark

2017-07-17 Thread Pralabh Kumar
Hi

To read file parallely , you can follow the below code.


 case class readData (fileName : String , spark : SparkSession) extends
Callable[Dataset[Row]]{
  override def call(): Dataset[Row] = {
spark.read.parquet(fileName)
   // spark.read.csv(fileName)
  }
}

val spark =  SparkSession.builder()
 .appName("practice")
 .config("spark.scheduler.mode","FAIR")
 .enableHiveSupport().getOrCreate()
   val pool = Executors.newFixedThreadPool(6)
   val list = new util.ArrayList[Future[Dataset[Row]]]()


 for(fileName<-"orders,lineitem,customer,supplier,region,nation".split(",")){
 val o1 = new readData(fileName,spark)
 //pool.submit(o1).
 list.add(pool.submit(o1))
   }
   val rddList = new ArrayBuffer[Dataset[Row]]()
   for(result <- list){
 rddList += result.get()
   }

   pool.shutdown()
   pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
   for(finalData<-rddList){
 finalData.show()
   }


This will read data in parallel ,which I think is your main bottleneck.

Regards
Pralabh Kumar



On Mon, Jul 17, 2017 at 6:25 PM, vaquar khan  wrote:

> Could you please let us know your Spark version?
>
>
> Regards,
> vaquar khan
>
>
> On Jul 17, 2017 12:18 AM, "163"  wrote:
>
>> I change the UDF but the performance seems still slow. What can I do else?
>>
>>
>> 在 2017年7月14日,下午8:34,Wenchen Fan  写道:
>>
>> Try to replace your UDF with Spark built-in expressions, it should be as
>> simple as `$”x” * (lit(1) - $”y”)`.
>>
>> On 14 Jul 2017, at 5:46 PM, 163  wrote:
>>
>> I modify the tech query5 to DataFrame:
>>
>> val forders = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders*”*).filter("o_orderdate
>>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
>> "o_orderkey")
>> val flineitem = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
>> val fcustomer = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
>> val fsupplier = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
>> val fregion = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region*”*).where("r_name
>>  = 'ASIA'").select($"r_regionkey")
>> val fnation = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation*”*)
>>
>> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
>>
>> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>>  .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>>  .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
>> $"c_nationkey" === fsupplier("s_nationkey"))
>>  .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>>  .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>>  .select($"n_name", decrease($"l_extendedprice", 
>> $"l_discount").as("value"))
>>  .groupBy($"n_name")
>>  .agg(sum($"value").as("revenue"))
>>  .sort($"revenue".desc).show()
>>
>>
>> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
>> each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet 
>> format.
>>
>> It executed about 1.5m, I found that read these 6 tables using 
>> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>>
>>  I’ve already set data locality and spark.default.parallelism, 
>> spark.serializer, using G1, But the runtime  is still not reduced.
>>
>> And is there any advices for me to tuning this performance?
>>
>> Thank you.
>>
>> Wenting He
>>
>>
>>
>>
>>


Re: How to tune the performance of Tpch query5 within Spark

2017-07-17 Thread vaquar khan
Verify your configuration, following link covered all Spark tuning points.

https://spark.apache.org/docs/latest/tuning.html

Regards,
Vaquar khan

On Jul 17, 2017 6:56 AM, "何文婷"  wrote:

2.1.1

发自网易邮箱大师
On 07/17/2017 20:55, vaquar khan  wrote:

Could you please let us know your Spark version?


Regards,
vaquar khan

On Jul 17, 2017 12:18 AM, "163"  wrote:

> I change the UDF but the performance seems still slow. What can I do else?
>
>
> 在 2017年7月14日,下午8:34,Wenchen Fan  写道:
>
> Try to replace your UDF with Spark built-in expressions, it should be as
> simple as `$”x” * (lit(1) - $”y”)`.
>
> On 14 Jul 2017, at 5:46 PM, 163  wrote:
>
> I modify the tech query5 to DataFrame:
>
> val forders = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders*”*).filter("o_orderdate
>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
> "o_orderkey")
> val flineitem = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
> val fcustomer = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
> val fsupplier = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
> val fregion = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region*”*).where("r_name
>  = 'ASIA'").select($"r_regionkey")
> val fnation = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation*”*)
>
> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
>
> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>  .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>  .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
> $"c_nationkey" === fsupplier("s_nationkey"))
>  .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>  .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>  .select($"n_name", decrease($"l_extendedprice", 
> $"l_discount").as("value"))
>  .groupBy($"n_name")
>  .agg(sum($"value").as("revenue"))
>  .sort($"revenue".desc).show()
>
>
> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
> each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet 
> format.
>
> It executed about 1.5m, I found that read these 6 tables using 
> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>
>  I’ve already set data locality and spark.default.parallelism, 
> spark.serializer, using G1, But the runtime  is still not reduced.
>
> And is there any advices for me to tuning this performance?
>
> Thank you.
>
> Wenting He
>
>
>
>
>


Re: How to tune the performance of Tpch query5 within Spark

2017-07-17 Thread 何文婷
2.1.1


发自网易邮箱大师
On 07/17/2017 20:55, vaquar khan wrote:
Could you please let us know your Spark version?




Regards, 
vaquar khan 



On Jul 17, 2017 12:18 AM, "163"  wrote:

I change the UDF but the performance seems still slow. What can I do else?




在 2017年7月14日,下午8:34,Wenchen Fan  写道:


Try to replace your UDF with Spark built-in expressions, it should be as simple 
as `$”x” * (lit(1) - $”y”)`.


On 14 Jul 2017, at 5:46 PM, 163  wrote:


I modify the tech query5 to DataFrame:
val forders = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders”).filter("o_orderdate
 < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region”).where("r_name
 = 'ASIA'").select($"r_regionkey")
val fnation = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation”)
valdecrease = udf { (x: Double, y: Double) => x * (1 - y) }
valres =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
 .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
 .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" 
=== fsupplier("s_nationkey"))
 .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
 .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
 .select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
 .groupBy($"n_name")
 .agg(sum($"value").as("revenue"))
 .sort($"revenue".desc).show()


My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each 
with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using 
spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, 
spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.


Wenting He







Re: How to tune the performance of Tpch query5 within Spark

2017-07-17 Thread vaquar khan
Could you please let us know your Spark version?


Regards,
vaquar khan

On Jul 17, 2017 12:18 AM, "163"  wrote:

> I change the UDF but the performance seems still slow. What can I do else?
>
>
> 在 2017年7月14日,下午8:34,Wenchen Fan  写道:
>
> Try to replace your UDF with Spark built-in expressions, it should be as
> simple as `$”x” * (lit(1) - $”y”)`.
>
> On 14 Jul 2017, at 5:46 PM, 163  wrote:
>
> I modify the tech query5 to DataFrame:
>
> val forders = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders*”*).filter("o_orderdate
>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
> "o_orderkey")
> val flineitem = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
> val fcustomer = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
> val fsupplier = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
> val fregion = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region*”*).where("r_name
>  = 'ASIA'").select($"r_regionkey")
> val fnation = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation*”*)
>
> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
>
> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>  .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>  .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
> $"c_nationkey" === fsupplier("s_nationkey"))
>  .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>  .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>  .select($"n_name", decrease($"l_extendedprice", 
> $"l_discount").as("value"))
>  .groupBy($"n_name")
>  .agg(sum($"value").as("revenue"))
>  .sort($"revenue".desc).show()
>
>
> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
> each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet 
> format.
>
> It executed about 1.5m, I found that read these 6 tables using 
> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>
>  I’ve already set data locality and spark.default.parallelism, 
> spark.serializer, using G1, But the runtime  is still not reduced.
>
> And is there any advices for me to tuning this performance?
>
> Thank you.
>
> Wenting He
>
>
>
>
>


Re: How to tune the performance of Tpch query5 within Spark

2017-07-17 Thread 163
I change the UDF but the performance seems still slow. What can I do else?


> 在 2017年7月14日,下午8:34,Wenchen Fan  写道:
> 
> Try to replace your UDF with Spark built-in expressions, it should be as 
> simple as `$”x” * (lit(1) - $”y”)`.
> 
>> On 14 Jul 2017, at 5:46 PM, 163 > > wrote:
>> 
>> I modify the tech query5 to DataFrame:
>> val forders = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders
>>  
>> ”).filter("o_orderdate
>>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
>> "o_orderkey")
>> val flineitem = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
>>  ")
>> val fcustomer = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
>>  ")
>> val fsupplier = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
>>  ")
>> val fregion = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region
>>  
>> ”).where("r_name
>>  = 'ASIA'").select($"r_regionkey")
>> val fnation = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation
>>  ”)
>> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
>> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>>  .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>>  .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
>> $"c_nationkey" === fsupplier("s_nationkey"))
>>  .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>>  .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>>  .select($"n_name", decrease($"l_extendedprice", 
>> $"l_discount").as("value"))
>>  .groupBy($"n_name")
>>  .agg(sum($"value").as("revenue"))
>>  .sort($"revenue".desc).show()
>> 
>> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
>> each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet 
>> format.
>> It executed about 1.5m, I found that read these 6 tables using 
>> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>>  I’ve already set data locality and spark.default.parallelism, 
>> spark.serializer, using G1, But the runtime  is still not reduced. 
>> And is there any advices for me to tuning this performance?
>> Thank you.
>> 
>> Wenting He
>> 
> 



Re: How to tune the performance of Tpch query5 within Spark

2017-07-14 Thread Wenchen Fan
Try to replace your UDF with Spark built-in expressions, it should be as simple 
as `$”x” * (lit(1) - $”y”)`.

> On 14 Jul 2017, at 5:46 PM, 163  wrote:
> 
> I modify the tech query5 to DataFrame:
> val forders = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders
>  
> ”).filter("o_orderdate
>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
> "o_orderkey")
> val flineitem = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
>  ")
> val fcustomer = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
>  ")
> val fsupplier = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
>  ")
> val fregion = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region
>  
> ”).where("r_name 
> = 'ASIA'").select($"r_regionkey")
> val fnation = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation
>  ”)
> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>  .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>  .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
> $"c_nationkey" === fsupplier("s_nationkey"))
>  .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>  .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>  .select($"n_name", decrease($"l_extendedprice", 
> $"l_discount").as("value"))
>  .groupBy($"n_name")
>  .agg(sum($"value").as("revenue"))
>  .sort($"revenue".desc).show()
> 
> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
> each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet 
> format.
> It executed about 1.5m, I found that read these 6 tables using 
> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>  I’ve already set data locality and spark.default.parallelism, 
> spark.serializer, using G1, But the runtime  is still not reduced. 
> And is there any advices for me to tuning this performance?
> Thank you.
> 
> Wenting He
> 



How to tune the performance of Tpch query5 within Spark

2017-07-14 Thread 163
I modify the tech query5 to DataFrame:
val forders = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders 
”).filter("o_orderdate
 < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
 ")
val fcustomer = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
 ")
val fsupplier = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
 ")
val fregion = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region 
”).where("r_name = 
'ASIA'").select($"r_regionkey")
val fnation = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation 
”)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
 .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
 .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" 
=== fsupplier("s_nationkey"))
 .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
 .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
 .select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
 .groupBy($"n_name")
 .agg(sum($"value").as("revenue"))
 .sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each 
with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using 
spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, 
spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He



How to tune the performance of Tpch query5 within Spark

2017-07-14 Thread 163

> 
> I modify the tech query5 to DataFrame:
> val forders = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders
>  
> ”).filter("o_orderdate
>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
> "o_orderkey")
> val flineitem = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
>  ")
> val fcustomer = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
>  ")
> val fsupplier = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
>  ")
> val fregion = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region
>  
> ”).where("r_name 
> = 'ASIA'").select($"r_regionkey")
> val fnation = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation
>  ”)
> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>  .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>  .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
> $"c_nationkey" === fsupplier("s_nationkey"))
>  .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>  .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>  .select($"n_name", decrease($"l_extendedprice", 
> $"l_discount").as("value"))
>  .groupBy($"n_name")
>  .agg(sum($"value").as("revenue"))
>  .sort($"revenue".desc).show()
> 
> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
> each with 40 cores and 128GB memory.  TPCH 100G, stored data on hdfs with 
> parquet format.
> It executed about 1.5m, I found that read these 6 tables using 
> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>  I’ve already set data locality and spark.default.parallelism, 
> spark.serializer, using G1, But the runtime  is still not reduced. 
> And is there any advices for me to tuning this performance?
> Thank you.
Wenting He.