I found a workaround, when I create Hive Table using Spark “saveAsTable”, I see 
filters being pushed down.

-> other approaches I tried where filters are not pushed down Is, 

1) when I create Hive Table upfront and load orc into it using Spark SQL
2) when I create orc files using spark SQL and then create Hive External Table

If my understanding is correct, when I use saveAsTable spark is using & also 
registering Hive Metastore with its custom Serde and Is able to pushdown 
filters. 
Please correct me.

Another question, 

When i am writing Orc to hive using “saveAsTable”, is there any way I can 
provide details about Orc Files.
for instance: stripe.size, can i create bloom filters etc… 


Regards
Shiv



> On Oct 25, 2017, at 1:37 AM, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> Well the meta information is in the file so I am not surprised that it reads 
> the file, but it should not read all the content, which is probably also not 
> happening. 
> 
> On 24. Oct 2017, at 18:16, Siva Gudavalli <gudavalli.s...@yahoo.com.INVALID 
> <mailto:gudavalli.s...@yahoo.com.INVALID>> wrote:
> 
>> 
>> Hello,
>>  
>> I have an update here. 
>>  
>> spark SQL is pushing predicates down, if I load the orc files in spark 
>> Context and Is not the same when I try to read hive Table directly.
>> please let me know if i am missing something here.
>>  
>> Is this supported in spark  ? 
>>  
>> when I load the files in spark Context 
>> scala> val hhhhhlogsv5 = 
>> sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5")
>> 17/10/24 16:11:15 INFO OrcRelation: Listing 
>> maprfs:///user/hive/warehouse/hhhhhlogsv5 on driver
>> 17/10/24 16:11:15 INFO OrcRelation: Listing 
>> maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003 on driver
>> 17/10/24 16:11:15 INFO OrcRelation: Listing 
>> maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others on 
>> driver
>> 17/10/24 16:11:15 INFO OrcRelation: Listing 
>> maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers
>>  on driver
>> hhhhhlogsv5: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: 
>> string, br: string, rg: string, cat: int, scat: int, usr: string, org: 
>> string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: 
>> int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string]
>> scala> hhhhhlogsv5.registerTempTable("tempo")
>> scala> sqlContext.sql ( "selecT id from tempo where cdt=20171003 and 
>> usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
>> 17/10/24 16:11:22 INFO ParseDriver: Parsing command: selecT id from tempo 
>> where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id 
>> desc limit 10
>> 17/10/24 16:11:22 INFO ParseDriver: Parse Completed
>> 17/10/24 16:11:22 INFO DataSourceStrategy: Selected 1 partitions out of 1, 
>> pruned 0.0% partitions.
>> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6 stored as values in 
>> memory (estimated size 164.5 KB, free 468.0 KB)
>> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes 
>> in memory (estimated size 18.3 KB, free 486.4 KB)
>> 17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory 
>> on 172.21.158.61:43493 (size: 18.3 KB, free: 511.4 MB)
>> 17/10/24 16:11:22 INFO SparkContext: Created broadcast 6 from explain at 
>> <console>:33
>> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7 stored as values in 
>> memory (estimated size 170.2 KB, free 656.6 KB)
>> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes 
>> in memory (estimated size 18.8 KB, free 675.4 KB)
>> 17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory 
>> on 172.21.158.61:43493 (size: 18.8 KB, free: 511.4 MB)
>> 17/10/24 16:11:22 INFO SparkContext: Created broadcast 7 from explain at 
>> <console>:33
>> == Physical Plan ==
>> TakeOrderedAndProject(limit=10, orderBy=[id#145 DESC], output=[id#145])
>> +- ConvertToSafe
>> +- Project [id#145]
>> +- Filter (usr#152 = AA0YP)
>> +- Scan OrcRelation[id#145,usr#152] InputPaths: 
>> maprfs:///user/hive/warehouse/hhhhhlogsv5, PushedFilters: 
>> [EqualTo(usr,AA0YP)]
>>  
>> when i read this as hive Table 
>>  
>> scala> sqlContext.sql ( "selecT id from hhhhhlogsv5 where cdt=20171003 and 
>> usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
>> 17/10/24 16:11:32 INFO ParseDriver: Parsing command: selecT id from 
>> hhhhhlogsv5 where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' 
>> order by id desc limit 10
>> 17/10/24 16:11:32 INFO ParseDriver: Parse Completed
>> 17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8 stored as values in 
>> memory (estimated size 399.1 KB, free 1074.6 KB)
>> 17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes 
>> in memory (estimated size 42.7 KB, free 1117.2 KB)
>> 17/10/24 16:11:32 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory 
>> on 172.21.158.61:43493 (size: 42.7 KB, free: 511.4 MB)
>> 17/10/24 16:11:32 INFO SparkContext: Created broadcast 8 from explain at 
>> <console>:33
>> == Physical Plan ==
>> TakeOrderedAndProject(limit=10, orderBy=[id#192 DESC], output=[id#192])
>> +- ConvertToSafe
>> +- Project [id#192]
>> +- Filter (usr#199 = AA0YP)
>> +- HiveTableScan [id#192,usr#199], MetastoreRelation default, hhhhhlogsv5, 
>> None, [(cdt#189 = 20171003),(usrpartkey#191 = hhhUsers)]
>>  
>>  
>> please let me know if i am missing anything here. thank you
>> 
>> 
>> On Monday, October 23, 2017 1:56 PM, Siva Gudavalli <gss.su...@gmail.com 
>> <mailto:gss.su...@gmail.com>> wrote:
>> 
>> 
>> Hello,
>>  
>> I am working with Spark SQL to query Hive Managed Table (in Orc Format)
>>  
>> I have my data organized by partitions and asked to set indexes for each 
>> 50,000 Rows by setting ('orc.row.index.stride'='50000') 
>>  
>> lets say -> after evaluating partition there are around 50 files in which 
>> data is organized.
>>  
>> Each file contains data specific to one given "cat" and I have set up a 
>> bloom filter on cat.
>>  
>> my spark SQL query looks like this ->
>>  
>> select * from logs where cdt= 20171002 and catpartkey= others and 
>> usrpartkey= logUsers and cat = 24;
>>  
>> I have set following property in my spark Sql context and assuming this will 
>> push down the filters 
>> sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
>>  
>> Never my filters are being pushed down. and it seems like partition pruning 
>> is happening on all files. I dont understand no matter what my query is, it 
>> is triggering 50 tasks and reading all files. 
>>  
>> Here is my debug logs -> 
>>  
>> 17/10/23 17:26:43 DEBUG Inode: >Inode Open file: 
>> /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0,
>>  size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
>> 17/10/23 17:26:43 DEBUG OrcInputFormat: No ORC pushdown predicate
>> 17/10/23 17:26:43 INFO OrcRawRecordMerger: min key = null, max key = null
>> 17/10/23 17:26:43 INFO ReaderImpl: Reading ORC rows from 
>> maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
>>  with {include: [true, true, false, false, false, false, true, false, false, 
>> false, false, false, false, false, false, false, false, false], offset: 0, 
>> length: 9223372036854775807}
>> 17/10/23 17:26:43 DEBUG MapRClient: Open: path = 
>> /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
>> 17/10/23 17:26:43 DEBUG Inode: >Inode Open file: 
>> /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0,
>>  size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
>> 17/10/23 17:26:43 DEBUG RecordReaderImpl: chunks = [range start: 67684 end: 
>> 15790993, range start: 21131541 end: 21146035]
>> 17/10/23 17:26:43 DEBUG RecordReaderImpl: merge = [data range [67684, 
>> 15790993), size: 15723309 type: array-backed, data range [21131541, 
>> 21146035), size: 14494 type: array-backed]
>> 17/10/23 17:26:43 DEBUG Utilities: Hive Conf not found or Session not 
>> initiated, use thread based class loader instead
>> 17/10/23 17:26:43 DEBUG HadoopTableReader: 
>> org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcStructInspector<org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive..serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector@e8220d5>
>> 17/10/23 17:26:43 DEBUG GeneratePredicate: Generated predicate '(input[1, 
>> IntegerType] = 27)':
>>  
>> and here is my execution plan 
>> == Parsed Logical Plan ==
>> 'Limit 1000
>> +- 'Sort ['id DESC], true
>> +- 'Project [unresolvedalias('id)]
>> +- 'Filter (((('cdt = 20171002) && ('catpartkey = others)) && ('usrpartkey = 
>> logUsers)) && ('cat = 27))
>> +- 'UnresolvedRelation `auditlogsv5`, None
>> == Analyzed Logical Plan ==
>> id: string
>> Limit 1000
>> +- Sort [id#165 DESC], true
>> +- Project [id#165]
>> +- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && 
>> (usrpartkey#164 = logUsers)) && (cat#170 = 27))
>> +- MetastoreRelation default, auditlogsv5, None
>> == Optimized Logical Plan ==
>> Limit 1000
>> +- Sort [id#165 DESC], true
>> +- Project [id#165]
>> +- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && 
>> (usrpartkey#164 = logUsers)) && (cat#170 = 27))
>> +- MetastoreRelation default, auditlogsv5, None
>> == Physical Plan ==
>> TakeOrderedAndProject(limit=1000, orderBy=[id#165 DESC], output=[id#165])
>> +- ConvertToSafe
>> +- Project [id#165]
>> +- Filter (cat#170 = 27)
>> +- HiveTableScan [id#165,cat#170], MetastoreRelation default, logs, None, 
>> [(cdt#162 = 20171002),(catpartkey#163 = others),(usrpartkey#164 = logUsers)]
>>  
>>  
>> Am I missing something here. I am on spark 1.6.1 and hive 1.2.0
>>  
>> please correct me. Thank you
>> 
>> 

Reply via email to