[jira] [Created] (HIVE-17755) NPE exception when running TestAcidOnTez#testGetSplitsLocks with "
liyunzhang_intel created HIVE-17755: --- Summary: NPE exception when running TestAcidOnTez#testGetSplitsLocks with " Key: HIVE-17755 URL: https://issues.apache.org/jira/browse/HIVE-17755 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17634) Use properties from HiveConf in RelOptHiveTable#updateColStats
liyunzhang_intel created HIVE-17634: --- Summary: Use properties from HiveConf in RelOptHiveTable#updateColStats Key: HIVE-17634 URL: https://issues.apache.org/jira/browse/HIVE-17634 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel in [RelOptHiveTable#updateColStats|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java#L309], we set {{fetchColStats}},{{fetchPartStats}} as true when call {{StatsUtils.collectStatistics}} {code} if (!hiveTblMetadata.isPartitioned()) { // 2.1 Handle the case for unpartitioned table. try { Statistics stats = StatsUtils.collectStatistics(hiveConf, null, hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, colStatsCached, nonPartColNamesThatRqrStats, true, true); ... {code} This will cause querying columns statistic from metastore even we set {{hive.stats.fetch.column.stats}} and {{hive.stats.fetch.partition.stats}} as false in HiveConf. If we these two properties as false, we can not any column statistics from metastore. Suggest to set the properties from HiveConf. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17486) Enable SharedWorkOptimizer in tez on HOS
liyunzhang_intel created HIVE-17486: --- Summary: Enable SharedWorkOptimizer in tez on HOS Key: HIVE-17486 URL: https://issues.apache.org/jira/browse/HIVE-17486 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel Assignee: liyunzhang_intel in HIVE-16602, Implement shared scans with Tez. Given a query plan, the goal is to identify scans on input tables that can be merged so the data is read only once. Optimization will be carried out at the physical level. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17474) Different physical plan of same query(TPC-DS/70) on HOS
liyunzhang_intel created HIVE-17474: --- Summary: Different physical plan of same query(TPC-DS/70) on HOS Key: HIVE-17474 URL: https://issues.apache.org/jira/browse/HIVE-17474 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel in [DS/query70|https://github.com/kellyzly/hive-testbench/blob/hive14/sample-queries-tpcds/query70.sql]. On hive version(d3b88f6), i found that the physical plan is different in runtime with the same settings. sometimes the physical plan {code} TS[0]-FIL[63]-SEL[2]-RS[43]-JOIN[45]-RS[46]-JOIN[48]-SEL[49]-GBY[50]-RS[51]-GBY[52]-SEL[53]-RS[54]-SEL[55]-PTF[56]-SEL[57]-RS[59]-SEL[60]-LIM[61]-FS[62] TS[3]-FIL[64]-SEL[5]-RS[44]-JOIN[45] TS[6]-FIL[65]-SEL[8]-RS[39]-JOIN[41]-RS[47]-JOIN[48] TS[9]-FIL[67]-SEL[11]-RS[18]-JOIN[20]-RS[21]-JOIN[23]-SEL[24]-GBY[25]-RS[26]-GBY[27]-RS[29]-SEL[30]-PTF[31]-FIL[66]-SEL[32]-GBY[38]-RS[40]-JOIN[41] TS[12]-FIL[68]-SEL[14]-RS[19]-JOIN[20] TS[15]-FIL[69]-SEL[17]-RS[22]-JOIN[23] {code} TS\[6\] connects with TS\[9\] on JOIN\[41\] and connects with TS\[0\] on JOIN\[48\]. sometimes {code} TS[0]-FIL[63]-RS[3]-JOIN[6]-RS[8]-JOIN[11]-RS[41]-JOIN[44]-SEL[46]-GBY[47]-RS[48]-GBY[49]-RS[50]-GBY[51]-RS[52]-SEL[53]-PTF[54]-SEL[55]-RS[57]-SEL[58]-LIM[59]-FS[60] TS[1]-FIL[64]-RS[5]-JOIN[6] TS[2]-FIL[65]-RS[10]-JOIN[11] TS[12]-FIL[68]-RS[16]-JOIN[19]-RS[20]-JOIN[23]-FIL[67]-SEL[25]-GBY[26]-RS[27]-GBY[28]-RS[29]-GBY[30]-RS[31]-SEL[32]-PTF[33]-FIL[66]-SEL[34]-GBY[39]-RS[43]-JOIN[44] TS[13]-FIL[69]-RS[18]-JOIN[19] TS[14]-FIL[70]-RS[22]-JOIN[23] {code} TS\[2\] connects with TS\[0\] on JOIN\[11\] Although TS\[2\] and TS\[6\] has different operator id, they are table store in the query. The difference causes different spark execution plan and different execution time. I'm very confused why there are different physical plan with same setting. Can anyone know where to investigate the root cause? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17412) Add "-- SORT_QUERY_RESULTS" for spark_vectorized_dynamic_partition_pruning.q
liyunzhang_intel created HIVE-17412: --- Summary: Add "-- SORT_QUERY_RESULTS" for spark_vectorized_dynamic_partition_pruning.q Key: HIVE-17412 URL: https://issues.apache.org/jira/browse/HIVE-17412 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel Assignee: liyunzhang_intel for query {code} set hive.optimize.ppd=true; set hive.ppd.remove.duplicatefilters=true; set hive.spark.dynamic.partition.pruning=true; set hive.optimize.metadataonly=false; set hive.optimize.index.filter=true; set hive.vectorized.execution.enabled=true; set hive.strict.checks.cartesian.product=false; select distinct ds from srcpart; {code} the result is {code} 2008-04-09 2008-04-08 {code} the result of groupby in spark is not in order. Sometimes it returns {code} 2008-04-08 2008-04-09 {code} Sometimes it returns {code} 2008-04-09 2008-04-08 {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17407) TPC-DS/query65 hangs on HoS in certain settings
liyunzhang_intel created HIVE-17407: --- Summary: TPC-DS/query65 hangs on HoS in certain settings Key: HIVE-17407 URL: https://issues.apache.org/jira/browse/HIVE-17407 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel [TPC-DS/query65.sql|https://github.com/kellyzly/hive-testbench/blob/hive14/sample-queries-tpcds/query65.sql] hangs when using following settings on 3TB scale. {code} set hive.auto.convert.join.noconditionaltask.size=300; {code} the explain is attached in explain65. The screenshot shows that it hanged in the Stage5. Let's explain why hang. {code} Reducer 10 <- Map 9 (GROUP, 1009) Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 5 (PARTITION-LEVEL SORT, 1), Reducer 7 (PARTITION-LEVEL SORT, 1) Reducer 3 <- Reducer 10 (PARTITION-LEVEL SORT, 1009), Reducer 2 (PARTITION-LEVEL SORT, 1009) Reducer 4 <- Reducer 3 (SORT, 1) Reducer 7 <- Map 6 (GROUP PARTITION-LEVEL SORT, 1009) {code} The numPartitions of SparkEdgeProperty which connects Reducer 2 and Reducer 3 is 1. This is because org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils#createReduceWork {code} public ReduceWork createReduceWork(GenSparkProcContext context, Operator root, SparkWork sparkWork) throws SemanticException { for (Operator parentOfRoot : root.getParentOperators()) { Preconditions.checkArgument(parentOfRoot instanceof ReduceSinkOperator, "AssertionError: expected parentOfRoot to be an " + "instance of ReduceSinkOperator, but was " + parentOfRoot.getClass().getName()); ReduceSinkOperator reduceSink = (ReduceSinkOperator) parentOfRoot; maxExecutors = Math.max(maxExecutors, reduceSink.getConf().getNumReducers()); } reduceWork.setNumReduceTasks(maxExecutors); {code} here the numReducers of all parentOfRoot is 1( in the explain, the parallelism of Map 1, Map 5, Reducer 7 is 1), so the numPartitions of SparkEdgeProperty which connects Reducer 2 and Reducer 3 is 1. More explain why the parallelism of Map 1, Map 5,Reducer 7 are 1. The physical plan of the query is {code} TS[0]-FIL[50]-RS[2]-JOIN[5]-FIL[49]-SEL[7]-GBY[8]-RS[9]-GBY[10]-SEL[11]-GBY[15]-SEL[16]-RS[33]-JOIN[34]-RS[36]-JOIN[39]-FIL[48]-SEL[41]-RS[42]-SEL[43]-LIM[44]-FS[45] TS[1]-FIL[51]-RS[4]-JOIN[5] TS[17]-FIL[53]-RS[19]-JOIN[22]-FIL[52]-SEL[24]-GBY[25]-RS[26]-GBY[27]-RS[38]-JOIN[39] TS[18]-FIL[54]-RS[21]-JOIN[22] TS[29]-FIL[55]-RS[31]-JOIN[34] TS[30]-FIL[56]-RS[32]-JOIN[34] {code} The related RS of Map1, Map5, Reducer 7 is RS\[31\], RS\[32\], RS\[33\]. The parallelism is set by [SemanticAnalyzer#genJoinReduceSinkChild|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L8267] It seems that there is no logical error in the code. But it is not reasonable to use 1 task to execute to deal with so big data(more than 30GB). Is there any way to pass the query in this situation( the reason why i set hive.auto.convert.join.noconditionaltask.size as 300, if the join is converted to the map join, it will throw disk error). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17287) HoS can not deal with skewed data group by
liyunzhang_intel created HIVE-17287: --- Summary: HoS can not deal with skewed data group by Key: HIVE-17287 URL: https://issues.apache.org/jira/browse/HIVE-17287 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel In [tpcds/query67.sql|https://github.com/kellyzly/hive-testbench/blob/hive14/sample-queries-tpcds/query67.sql], fact table {{store_sales}} joins with small tables {{date_dim}}, {{item}},{{store}}. After join, groupby the intermediate data. Here the data of {{store_sales}} on 3TB tpcds is skewed: there are 1824 partitions. The biggest partition is 25.7G and others are 715M. {code} hadoop fs -du -h /user/hive/warehouse/tpcds_bin_partitioned_parquet_3000.db/store_sales 715.0 M /user/hive/warehouse/tpcds_bin_partitioned_parquet_3000.db/store_sales/ss_sold_date_sk=2452639 713.9 M /user/hive/warehouse/tpcds_bin_partitioned_parquet_3000.db/store_sales/ss_sold_date_sk=2452640 714.1 M /user/hive/warehouse/tpcds_bin_partitioned_parquet_3000.db/store_sales/ss_sold_date_sk=2452641 712.9 M /user/hive/warehouse/tpcds_bin_partitioned_parquet_3000.db/store_sales/ss_sold_date_sk=2452642 25.7 G /user/hive/warehouse/tpcds_bin_partitioned_parquet_3000.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__ {code} The skewed table {{store_sales}} caused the failed job. Is there any way to solve the groupby problem of skewed table? I tried to enable {{hive.groupby.skewindata}} to first divide the data more evenly then start do group by. But the job still hangs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17182) Invalid statistics like "RAW DATA SIZE" info for parquet file
liyunzhang_intel created HIVE-17182: --- Summary: Invalid statistics like "RAW DATA SIZE" info for parquet file Key: HIVE-17182 URL: https://issues.apache.org/jira/browse/HIVE-17182 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel on TPC-DS 200g scale store_sales use "describe formatted store_sales" to view the statistics {code} hive> describe formatted store_sales; OK # col_name data_type comment ss_sold_time_sk bigint ss_item_sk bigint ss_customer_sk bigint ss_cdemo_sk bigint ss_hdemo_sk bigint ss_addr_sk bigint ss_store_sk bigint ss_promo_sk bigint ss_ticket_numberbigint ss_quantity int ss_wholesale_cost double ss_list_price double ss_sales_price double ss_ext_discount_amt double ss_ext_sales_price double ss_ext_wholesale_cost double ss_ext_list_price double ss_ext_tax double ss_coupon_amt double ss_net_paid double ss_net_paid_inc_tax double ss_net_profit double # Partition Information # col_name data_type comment ss_sold_date_sk bigint # Detailed Table Information Database: tpcds_bin_partitioned_parquet_200 Owner: root CreateTime: Tue Jun 06 11:51:48 CST 2017 LastAccessTime: UNKNOWN Retention: 0 Location: hdfs://bdpe38:9000/user/hive/warehouse/tpcds_bin_partitioned_parquet_200.db/store_sales Table Type: MANAGED_TABLE Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles2023 numPartitions 1824 numRows 575995635 rawDataSize 12671903970 totalSize 46465926745 transient_lastDdlTime 1496721108 {code} the rawDataSize is nearly 12G while the totalSize is nearly 46G. view the original data on hdfs {format} #hadoop fs -du -h /tmp/tpcds-generate/200/ 75.8 G /tmp/tpcds-generate/200/store_sales {format} view the parquet file on hdfs {format} # hadoop fs -du -h /user/hive/warehouse/tpcds_bin_partitioned_parquet_200.db 43.3 G /user/hive/warehouse/tpcds_bin_partitioned_parquet_200.db/store_sales {format} It seems that the rawDataSize is nearly 75G but in "describe formatted store_sales" command, it shows only 12G. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17108) Parquet file does not gather statistic such as "RAW DATA SIZE" automatically
liyunzhang_intel created HIVE-17108: --- Summary: Parquet file does not gather statistic such as "RAW DATA SIZE" automatically Key: HIVE-17108 URL: https://issues.apache.org/jira/browse/HIVE-17108 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel in [parquet_analyze.q|https://github.com/apache/hive/blob/master/ql/src/test/queries/clientpositive/parquet_analyze.q#L27], we need run "ANALYZE TABLE parquet_create_people COMPUTE STATISTICS noscan" to update the statistic. In [orc_analyze.q|https://github.com/apache/hive/blob/master/ql/src/test/queries/clientpositive/orc_analyze.q#L45], we need not do that if we set hive.stats.autogather as true. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17018) Small table can not be converted to map join in TPC-DS/query.17 on 3TB data scale
liyunzhang_intel created HIVE-17018: --- Summary: Small table can not be converted to map join in TPC-DS/query.17 on 3TB data scale Key: HIVE-17018 URL: https://issues.apache.org/jira/browse/HIVE-17018 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel Assignee: liyunzhang_intel we use "hive.auto.convert.join.noconditionaltask.size" as the threshold. it means the sum of size for n-1 of the tables/partitions for a n-way join is smaller than it, it will be converted to a map join. for example, A join B join C join D join E. Big table is A(100M), small tables are B(10M),C(10M),D(10M),E(10M). If we set hive.auto.convert.join.noconditionaltask.size=20M. In current code, E,D,B will be converted to map join but C will not be converted to map join. In my understanding, because hive.auto.convert.join.noconditionaltask.size can only contain E and D, so C and B should not be converted to map join. Let's explain more why E can be converted to map join. in current code, [SparkMapJoinOptimizer#getConnectedMapJoinSize|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L364] calculates all the mapjoins in the parent path and child path. The search stops when encountering [UnionOperator or ReduceOperator|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L381]. Because C is not converted to map join because {{connectedMapJoinSize + totalSize) > maxSize}} [see code|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L330].The RS before the join of C remains. When calculating whether B will be converted to map join, {{getConnectedMapJoinSize}} returns 0 as encountering [RS |https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#409] and causes {{connectedMapJoinSize + totalSize) < maxSize}} matches. [~xuefuz] or [~jxiang]: can you help see the problem as you are more familiar with SparkJoinOptimizer. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-17010) Fix the overflow problem of Long type in SetSparkReducerParallelism
liyunzhang_intel created HIVE-17010: --- Summary: Fix the overflow problem of Long type in SetSparkReducerParallelism Key: HIVE-17010 URL: https://issues.apache.org/jira/browse/HIVE-17010 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel Assignee: liyunzhang_intel -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-16980) The partition of join is not divided evently in HOS
liyunzhang_intel created HIVE-16980: --- Summary: The partition of join is not divided evently in HOS Key: HIVE-16980 URL: https://issues.apache.org/jira/browse/HIVE-16980 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel In HoS,the join implementation is union+repartition sort. We use HashPartitioner to partition the result of union. SortByShuffler.java {code} public JavaPairRDD shuffle( JavaPairRDD input, int numPartitions) { JavaPairRDD rdd; if (totalOrder) { if (numPartitions > 0) { if (numPartitions > 1 && input.getStorageLevel() == StorageLevel.NONE()) { input.persist(StorageLevel.DISK_ONLY()); sparkPlan.addCachedRDDId(input.id()); } rdd = input.sortByKey(true, numPartitions); } else { rdd = input.sortByKey(true); } } else { Partitioner partitioner = new HashPartitioner(numPartitions); rdd = input.repartitionAndSortWithinPartitions(partitioner); } return rdd; } {code} In spark history server, i saw that there are 28 tasks in the repartition sort period while 21 tasks are finished less than 1s and the remaining 7 tasks spend long time to execute. Is there any way to make the data evenly assigned to every partition? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HIVE-16948) Invalid explain when running dynamic partition pruning query
liyunzhang_intel created HIVE-16948: --- Summary: Invalid explain when running dynamic partition pruning query Key: HIVE-16948 URL: https://issues.apache.org/jira/browse/HIVE-16948 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel union_subquery.q {code} set hive.optimize.ppd=true; set hive.ppd.remove.duplicatefilters=true; set hive.spark.dynamic.partition.pruning=true; set hive.optimize.metadataonly=false; set hive.optimize.index.filter=true; set hive.strict.checks.cartesian.product=false; explain select ds from (select distinct(ds) as ds from srcpart union all select distinct(ds) as ds from srcpart) s where s.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart); {code} explain {code} STAGE DEPENDENCIES: Stage-2 is a root stage Stage-1 depends on stages: Stage-2 Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-2 Spark Edges: Reducer 11 <- Map 10 (GROUP, 1) Reducer 13 <- Map 12 (GROUP, 1) DagName: root_20170622231525_20a777e5-e659-4138-b605-65f8395e18e2:2 Vertices: Map 10 Map Operator Tree: TableScan alias: srcpart Statistics: Num rows: 1 Data size: 23248 Basic stats: PARTIAL Column stats: NONE Select Operator expressions: ds (type: string) outputColumnNames: ds Statistics: Num rows: 1 Data size: 23248 Basic stats: PARTIAL Column stats: NONE Group By Operator aggregations: max(ds) mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: string) Map 12 Map Operator Tree: TableScan alias: srcpart Statistics: Num rows: 1 Data size: 23248 Basic stats: PARTIAL Column stats: NONE Select Operator expressions: ds (type: string) outputColumnNames: ds Statistics: Num rows: 1 Data size: 23248 Basic stats: PARTIAL Column stats: NONE Group By Operator aggregations: min(ds) mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: string) Reducer 11 Reduce Operator Tree: Group By Operator aggregations: max(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: _col0 is not null (type: boolean) Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE Group By Operator keys: _col0 (type: string) mode: hash outputColumnNames: _col0 Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE Group By Operator keys: _col0 (type: string) mode: hash outputColumnNames: _col0 Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE Spark Partition Pruning Sink Operator partition key expr: ds Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE target column name: ds target work: Map 1 Select Operator expressions: _col0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE Group By Operator
[jira] [Created] (HIVE-16862) Implement a similar feature like "hive.tez.dynamic.semijoin.reduction" in hive on spark
liyunzhang_intel created HIVE-16862: --- Summary: Implement a similar feature like "hive.tez.dynamic.semijoin.reduction" in hive on spark Key: HIVE-16862 URL: https://issues.apache.org/jira/browse/HIVE-16862 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel Currently if we enable "hive.tez.dynamic.semijoin.reduction" (the default value is true) in hive on spark, following script fail {code} set hive.optimize.ppd=true; set hive.ppd.remove.duplicatefilters=true; set hive.spark.dynamic.partition.pruning=true; set hive.optimize.metadataonly=false; set hive.optimize.index.filter=true; set hive.strict.checks.cartesian.product=false; set hive.spark.dynamic.partition.pruning=true; -- multiple sources, single key select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) {code} {code} the reason why this fail see HIVE-16780, currently we only disable "hive.tez.dynamic.semijoin.reduction" when running hive on spark to pass the test. Later we can implement a similar feature like what hive on tez does. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (HIVE-16840) Investigate the performance of order by limit in HoS
liyunzhang_intel created HIVE-16840: --- Summary: Investigate the performance of order by limit in HoS Key: HIVE-16840 URL: https://issues.apache.org/jira/browse/HIVE-16840 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel Assignee: liyunzhang_intel We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged. {code} select i_item_id ,i_item_desc ,s_state ,count(ss_quantity) as store_sales_quantitycount ,avg(ss_quantity) as store_sales_quantityave ,stddev_samp(ss_quantity) as store_sales_quantitystdev ,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov ,count(sr_return_quantity) as_store_returns_quantitycount ,avg(sr_return_quantity) as_store_returns_quantityave ,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev ,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as store_returns_quantitycov ,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) as catalog_sales_quantityave ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitystdev ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov from store_sales ,store_returns ,catalog_sales ,date_dim d1 ,date_dim d2 ,date_dim d3 ,store ,item where d1.d_quarter_name = '2000Q1' and d1.d_date_sk = store_sales.ss_sold_date_sk and item.i_item_sk = store_sales.ss_item_sk and store.s_store_sk = store_sales.ss_store_sk and store_sales.ss_customer_sk = store_returns.sr_customer_sk and store_sales.ss_item_sk = store_returns.sr_item_sk and store_sales.ss_ticket_number = store_returns.sr_ticket_number and store_returns.sr_returned_date_sk = d2.d_date_sk and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3') and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk and store_returns.sr_item_sk = catalog_sales.cs_item_sk and catalog_sales.cs_sold_date_sk = d3.d_date_sk and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3') group by i_item_id ,i_item_desc ,s_state order by i_item_id ,i_item_desc ,s_state limit 100; {code} the reason why the script hanged is because we only use 1 task to implement sort. {code} STAGE PLANS: Stage: Stage-1 Spark Edges: Reducer 10 <- Reducer 9 (SORT, 1) Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 (PARTITION-LEVEL SORT, 889) Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 (PARTITION-LEVEL SORT, 1009) Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 (PARTITION-LEVEL SORT, 683) Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 (PARTITION-LEVEL SORT, 751) Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 (PARTITION-LEVEL SORT, 826) Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 (PARTITION-LEVEL SORT, 909) Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 (PARTITION-LEVEL SORT, 1001) Reducer 9 <- Reducer 8 (GROUP, 2) {code} The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 task to execute to ensure the correctness. But the performance is poor. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (HIVE-16780) Case "multiple sources, single key" in spark_dynamic_pruning.q fails
liyunzhang_intel created HIVE-16780: --- Summary: Case "multiple sources, single key" in spark_dynamic_pruning.q fails Key: HIVE-16780 URL: https://issues.apache.org/jira/browse/HIVE-16780 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel Assignee: liyunzhang_intel script.q {code} set hive.optimize.ppd=true; set hive.ppd.remove.duplicatefilters=true; set hive.spark.dynamic.partition.pruning=true; set hive.optimize.metadataonly=false; set hive.optimize.index.filter=true; set hive.strict.checks.cartesian.product=false; set hive.spark.dynamic.partition.pruning=true; -- multiple sources, single key select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) {code} exception {code} job failed with java.io.FileNotFoundException: File hdfs://bdpe41:8020/tmp/hive/root/de80d82a-b910-4b87-940c-6be3ea37ba25/hive_2017-05-27_14-55-30_114_8497388836256415979-1/-mr-10004/1/5 does not exist. FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.FileNotFoundException: File hdfs://bdpe41:8020/tmp/hive/root/de80d82a-b910-4b87-940c-6be3ea37ba25/hive_2017-05-27_14-55-30_114_8497388836256415979-1/-mr-10004/1/5 does not exist. at org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:404) at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplits(CombineHiveInputFormat.java:498) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) at org.apache.spark.ShuffleDependency.(Dependency.scala:91) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:91) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:235) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:233) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:233) at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.rddToString(SparkUtilities.java:144) at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.rddToString(SparkUtilities.java:149) at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.rddToString(SparkUtilities.java:149) at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.rddToString(SparkUtilities.java:149) at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.rddGraphToString(SparkUtilities.java:134) at org.apache.hadoop.hive.ql.exec.spark.SparkPlan.generateGraph(SparkPlan.java:93) at org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:349) at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:358) at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:323) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.FileNotFoundException: File hdfs://bdpe41:8020/tmp/hive/root/de80d82a-b910-4b87-940c-6be3ea37ba25/hive_2017-05-27_14-55-30_114_8497388836256415979-1/-mr-10004/1/5 does not exist. at org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:147) at org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.prune(SparkDynamicPartitionPruner.java:76) at org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:402) ... 30 more Caused by: java.io.FileNotFoundException: File hdfs://bdpe41:8020/tmp/hive/root/de80d82a-b910-4b87-940c-6be3ea37ba25/hive_2017-05-27_14-55-30_114_8497388836256415979-1/-mr-10004/1/5 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:795) at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:853) at org.apache.h
[jira] [Created] (HIVE-16675) Fix ConcurrentModificationException in SparkClientImpl#startDriver
liyunzhang_intel created HIVE-16675: --- Summary: Fix ConcurrentModificationException in SparkClientImpl#startDriver Key: HIVE-16675 URL: https://issues.apache.org/jira/browse/HIVE-16675 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel Assignee: liyunzhang_intel the exception is {noformat} 2017-05-16T00:29:37,480 WARN [Driver] client.SparkClientImpl: Exception while waiting for child process. 3926 java.util.ConcurrentModificationException 3927 at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) ~[?:1.8.0_91] 3928 at java.util.ArrayList$Itr.next(ArrayList.java:851) ~[?:1.8.0_91] 3929 at org.apache.hive.spark.client.SparkClientImpl$3.run(SparkClientImpl.java:495) [hive-exec-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT] 3930 at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91] {noformat} It seems that {{SparkClientImpl.java#childErrorLog}} is read while it is written. It is better to change {{SparkClientImpl.java#childErrorLog}} from ArrayList to CopyOnWriteArrayList to avoid the exception. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
liyunzhang_intel created HIVE-16600: --- Summary: Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases Key: HIVE-16600 URL: https://issues.apache.org/jira/browse/HIVE-16600 Project: Hive Issue Type: Sub-task Reporter: liyunzhang_intel in multi_insert cases multi_insert_gby2.q, the parallelism of SORT operator is 1 even we set "hive.optimize.sampling.orderby" = true. This is because the logic of SetSparkReducerParallelism#needSetParallelism does not support this case. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (HIVE-16046) Broadcasting small table for Hive on Spark
liyunzhang_intel created HIVE-16046: --- Summary: Broadcasting small table for Hive on Spark Key: HIVE-16046 URL: https://issues.apache.org/jira/browse/HIVE-16046 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel currently the spark plan is {code} 1. TS(Small table)->Sel/Fil->HashTableSink 2. TS(Small table)->Sel/Fil->HashTableSink 3. HashTableDummy -- | HashTableDummy -- | RootTS(Big table) ->Sel/Fil ->MapJoin -->Sel/Fil ->FileSink {code} 1. Run the smalltable SparkWorks on Spark cluster, which dump to hashmap file 2.Run the SparkWork for the big table on Spark cluster. Mappers will lookup the smalltable hashmap from the file using HashTableDummy’s loader. The disadvantage of current implementation is it need long time to distribute cache the hash table if the hash table is large. Here want to use sparkContext.broadcast() to store small table although it will keep the broadcast variable in driver and bring some performance decline on driver. [~Fred], [~xuefuz], [~lirui] and [~csun], please give some suggestions on it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (HIVE-15432) java.lang.ClassCastException is thrown when setting "hive.input.format" as "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat" in hive on spark
liyunzhang_intel created HIVE-15432: --- Summary: java.lang.ClassCastException is thrown when setting "hive.input.format" as "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat" in hive on spark Key: HIVE-15432 URL: https://issues.apache.org/jira/browse/HIVE-15432 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel set "hive.input.format" as "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat" in itests/qtest/target/testconf/spark/standalone/hive-site.xml and run qtest like following cmd: {code} mvn test -Dtest=TestSparkCliDriver -Dtest.output.overwrite=true -Dqfile=union.q >log.TestSparkCliDriver 2>&1 {code} found following exception in itests/qtest-spark/target/tmp/log/hive.log {code} 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: java.lang.ClassCastException: Cannot cast org.apache.hadoop.hive.ql.io.CombineHiveInputFormat$CombineHiveInputSplit to org.apache.hadoop.mapred.InputSplitWithLocationInfo 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at java.lang.Class.cast(Class.java:3094) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.rdd.HadoopRDD.getPreferredLocations(HadoopRDD.scala:318) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:270) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:270) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at scala.Option.getOrElse(Option.scala:121) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:269) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1564) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1575) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1574) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1574) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at scala.collection.immutable.List.foreach(List.scala:381) 2016-12-14T23:43:17,819 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1574) 2016-12-14T23:43:17,820 INFO [stderr-redir-1] client.SparkClientImpl: at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1572) 2016-12-14T23:43:17,820 INFO [stderr-redir-1] client.SparkClientImpl: at scala.collection.immutable.List.foreach(List.scala:381) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (HIVE-15313) Add export spark.yarn.archive or spark.yarn.jars variable in Hive on Spark document
liyunzhang_intel created HIVE-15313: --- Summary: Add export spark.yarn.archive or spark.yarn.jars variable in Hive on Spark document Key: HIVE-15313 URL: https://issues.apache.org/jira/browse/HIVE-15313 Project: Hive Issue Type: Bug Reporter: liyunzhang_intel Priority: Minor According to [wiki|https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started], run queries in HOS16 and HOS20 in yarn mode. Following table shows the difference in query time between HOS16 and HOS20. ||Version||Total time||Time for Jobs||Time for preparing jobs|| |Spark16|51|39|12| |Spark20|54|40|14| HOS20 spends more time(2 secs) on preparing jobs than HOS16. After reviewing the source code of spark, found that following point causes this: code:[Client#distribute|https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L546], In spark20, if spark cannot find spark.yarn.archive and spark.yarn.jars in spark configuration file, it will first copy all jars in $SPARK_HOME/jars to a tmp directory and upload the tmp directory to distribute cache. Comparing [spark16|https://github.com/apache/spark/blob/branch-1.6/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1145], In spark16, it will find spark-assembly*.jar and upload it to distribute cache. In spark20, it spends 2 more seconds to copy all jars in $SPARK_HOME/jar to a tmp directory. We can accelerate the startup of hive on spark 20 by settintg "spark.yarn.archive" or "spark.yarn.jars": set "spark.yarn.archive": {code} zip spark-archive.zip $SPARK_HOME/jars/* $ hadoop fs -copyFromLocal spark-archive.zip $ echo "spark.yarn.archive=hdfs:///xxx:8020/spark-archive.zip" >> conf/spark-defaults.conf {code} set "spark.yarn.jars": {code} $ hadoop fs mkdir spark-2.0.0-bin-hadoop $hadoop fs -copyFromLocal $SPARK_HOME/jars/* spark-2.0.0-bin-hadoop $ echo "spark.yarn.jars=hdfs:///xxx:8020/spark-2.0.0-bin-hadoop/*" >> conf/spark-defaults.conf {code} Suggest to add this part in wiki. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (HIVE-15259) The deserialization time of HOS20 is longer than what in HOS16
liyunzhang_intel created HIVE-15259: --- Summary: The deserialization time of HOS20 is longer than what in HOS16 Key: HIVE-15259 URL: https://issues.apache.org/jira/browse/HIVE-15259 Project: Hive Issue Type: Improvement Reporter: liyunzhang_intel deploy Hive on Spark on spark 1.6 version and spark 2.0 version. run query and in latest code(with spark2.0) the deserialization time of a task is 4 sec while the deserialization time of spark1.6 is 1 sec. The detail is in attached picture. -- This message was sent by Atlassian JIRA (v6.3.4#6332)