attilapiros opened a new pull request, #38516:
URL: https://github.com/apache/spark/pull/38516

   ### What changes were proposed in this pull request?
   
   This is an update of https://github.com/apache/spark/pull/29178 which was 
closed because the root cause of the error was just vaguely defined there but 
here I will give an explanation why `HiveHBaseTableInputFormat` does not work 
well with the `NewHadoopRDD` (see in the next section). 
   
   The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format 
is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'. 
   
   - environments (Cloudera distribution 7.1.7.SP1):
   hadoop 3.1.1 
   hive 3.1.300
   spark 3.2.1 
   hbase 2.2.3
   
   ### Why are the changes needed?
   
   With the `NewHadoopRDD` the following exception is raised:
   
   ```
   java.io.IOException: Cannot create a record reader because of a previous 
error. Please look at the previous logs lines from the task's full log for more 
details.
     at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
     at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
     at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
     at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
     at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
     at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
     at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
     at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
     at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
     at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
     at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
     at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
     at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
     at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
     at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
     at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
     at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
     at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
     at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
     at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
     at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
     at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
     at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
     at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
     ... 47 elided
   Caused by: java.lang.IllegalStateException: The input format instance has 
not been properly initialized. Ensure you call initializeTable either in your 
constructor or initialize method
     at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
     at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
     ... 86 more
   ```
   
   
   ### Short summary of the reason
   
   There are two interfaces:
   
   - the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg 
method `getSplits(JobContext context)` (returning `List<InputSplit>`)
   - the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method 
`getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)
   
   And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the 
old method leads to required initialisation and this why `NewHadoopRDD` fails 
here.
   
   ### Details
   
   Here all the link refers to master branches latest commits of components to 
get the right line numbers in the future too: 
   
   Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
   
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
   
   Hive on the other hand binds the initialisation to the two args method 
coming from the old interface. 
   See 
   
https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268
   which calls `getSplitsInternal` with the initialisation:
   
https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299
   
   Interesting that Hive also uses the one arg method internally within the 
`getSplitsInternal`:
   
https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356
   but that does not help us as the initialisation done earlier in the  
`getSplitsInternal` method.
   
   By calling the new interface method only (what `NewHadoopRDD` does) the call 
goes `org.apache.hadoop.hbase.mapreduce.TableInputFormatBase`:
   
https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230
   
   Where there would be JobContext based initialisation:
   
https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237
   
   But that's not implemented by Hive (it won't be as easy as it is based on a 
`JobContext`):
   
https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640
    
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   1) create hbase table
   
   ```
    hbase(main):001:0>create 'hbase_test1', 'cf1'
    hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
   ```
   
   
   2) create hive table related to hbase table
   
   hive> 
   ```
   CREATE EXTERNAL TABLE `hivetest.hbase_test`(
     `key` string COMMENT '', 
     `value` string COMMENT '')
   ROW FORMAT SERDE 
     'org.apache.hadoop.hive.hbase.HBaseSerDe' 
   STORED BY 
     'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
   WITH SERDEPROPERTIES ( 
     'hbase.columns.mapping'=':key,cf1:v1', 
     'serialization.format'='1')
   TBLPROPERTIES (
     'hbase.table.name'='hbase_test')
   ```
    
   
   3): spark-shell query hive table while data in HBase
   
   ```
   scala> spark.sql("select * from hivetest.hbase_test").show()
   22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo 
does not exist
   22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf 
hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless 
hive logic
   Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
   +---+-----+
   |key|value|
   +---+-----+
   | r1|  123|
   +---+-----+
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to