Hi,

I believe this is an issue with Spark handing transactional tables in Hive.

When you add rows from Spark to ORC transactional table, Hive metadata
tables HIVE_LOCKS and TXNS tables are not updated. This does not happen
with Hive itself. As a result these new rows are left in an inconsistent
state.

Also if you delete rows from the said tables using Hive, many delta files
will be produced. In that case Hive will compact them eventually (roll them
into the main file itself), and can read data. However, Spark will not be
able to read the delta files until all is compacted.

Anyway this is my experience.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 14 September 2016 at 17:47, Jack Wenger <jack.wenge...@gmail.com> wrote:

> Hi there,
>
> I'm trying to use ACID transactions in Hive but I have a problem when the
> data are added with Spark.
>
> First, I created a table with the following statement :
> ____________________________________________________________
> __________________________
> CREATE TABLE testdb.test(id string, col1 string)
> CLUSTERED BY (id) INTO 4 BUCKETS
> STORED AS ORC TBLPROPERTIES('transactional'='true');
> ____________________________________________________________
> __________________________
>
> Then I added data with those queries :
> ____________________________________________________________
> __________________________
> INSERT INTO testdb.test VALUES("1", "A");
> INSERT INTO testdb.test VALUES("2", "B");
> INSERT INTO testdb.test VALUES("3", "C");
> ____________________________________________________________
> __________________________
>
> And I've been able to delete rows with this query :
> ____________________________________________________________
> __________________________
> DELETE FROM testdb.test WHERE id="1";
> ____________________________________________________________
> __________________________
>
> All that worked perfectly, but a problem occurs when I try to delete rows
> that were added with Spark.
>
> What I do in Spark (iPython) :
> ____________________________________________________________
> __________________________
> hc = HiveContext(sc)
> data = sc.parallelize([["1", "A"], ["2", "B"], ["3", "C"]])
> data_df = hc.createDataFrame(data)
> data_df.registerTempTable(data_df)
> hc.sql("INSERT INTO testdb.test SELECT * FROM data_df");
> ____________________________________________________________
> __________________________
>
> Then, when I come back to Hive, I'm able to run a SELECT query on this the
> "test" table.
> However, when I try to run the exact same DELETE query as before, I have
> the following error (it happens after the reduce phase) :
>
> ____________________________________________________________
> __________________________
>
> Error: java.lang.RuntimeException: 
> org.apache.hadoop.hive.ql.metadata.HiveException:
> Hive Runtime Error while processing row (tag=0)
> {"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"r
> owid":0}},"value":null}
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:265)
> at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
> upInformation.java:1671)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime
> Error while processing row (tag=0) {"key":{"reducesinkkey0":{"tra
> nsactionid":0,"bucketid":-1,"rowid":0}},"value":null}
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:253)
> ... 7 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(Fi
> leSinkOperator.java:723)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
> at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(Sele
> ctOperator.java:84)
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:244)
> ... 7 more
>
> ____________________________________________________________
> __________________________
>
> I have no idea where this is coming from, that is why I'm looking for
> advices on this mailing list.
>
> I'm using the Cloudera Quickstart VM (5.4.2).
> Hive version : 1.1.0
> Spark Version : 1.3.0
>
> And here is the complete output of the Hive DELETE command :
> ____________________________________________________________
> __________________________
>
> hive> delete from testdb.test where id="1";
>
> Query ID = cloudera_20160914090303_795e40b7-ab6a-45b0-8391-6d41d1cfe7bd
> Total jobs = 1
> Launching Job 1 out of 1
> Number of reduce tasks determined at compile time: 4
> In order to change the average load for a reducer (in bytes):
>   set hive.exec.reducers.bytes.per.reducer=<number>
> In order to limit the maximum number of reducers:
>   set hive.exec.reducers.max=<number>
> In order to set a constant number of reducers:
>   set mapreduce.job.reduces=<number>
> Starting Job = job_1473858545651_0036, Tracking URL =
> http://quickstart.cloudera:8088/proxy/application_1473858545651_0036/
> Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1473858545651_0036
> Hadoop job information for Stage-1: number of mappers: 2; number of
> reducers: 4
> 2016-09-14 09:03:55,571 Stage-1 map = 0%,  reduce = 0%
> 2016-09-14 09:04:14,898 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
> 1.66 sec
> 2016-09-14 09:04:15,944 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
> 3.33 sec
> 2016-09-14 09:04:44,101 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 4.21 sec
> 2016-09-14 09:04:46,523 Stage-1 map = 100%,  reduce = 25%, Cumulative CPU
> 4.79 sec
> 2016-09-14 09:04:47,673 Stage-1 map = 100%,  reduce = 42%, Cumulative CPU
> 5.8 sec
> 2016-09-14 09:04:50,041 Stage-1 map = 100%,  reduce = 75%, Cumulative CPU
> 7.45 sec
> 2016-09-14 09:05:18,486 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU
> 7.69 sec
> MapReduce Total cumulative CPU time: 7 seconds 690 msec
> Ended Job = job_1473858545651_0036 with errors
> Error during job, obtaining debugging information...
> Job Tracking URL: http://quickstart.cloudera:8088/proxy/application_
> 1473858545651_0036/
> Examining task ID: task_1473858545651_0036_m_000000 (and more) from job
> job_1473858545651_0036
>
> Task with the most failures(4):
> -----
> Task ID:
>   task_1473858545651_0036_r_000001
>
> URL:
>   http://0.0.0.0:8088/taskdetails.jsp?jobid=job_147385854565
> 1_0036&tipid=task_1473858545651_0036_r_000001
> -----
> Diagnostic Messages for this Task:
> Error: java.lang.RuntimeException: 
> org.apache.hadoop.hive.ql.metadata.HiveException:
> Hive Runtime Error while processing row (tag=0)
> {"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"r
> owid":0}},"value":null}
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:265)
> at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
> upInformation.java:1671)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime
> Error while processing row (tag=0) {"key":{"reducesinkkey0":{"tra
> nsactionid":0,"bucketid":-1,"rowid":0}},"value":null}
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:253)
> ... 7 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(Fi
> leSinkOperator.java:723)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
> at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(Sele
> ctOperator.java:84)
> at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
> ucer.java:244)
> ... 7 more
>
>
> FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec
> .mr.MapRedTask
> MapReduce Jobs Launched:
> Stage-Stage-1: Map: 2  Reduce: 4   Cumulative CPU: 7.69 sec   HDFS Read:
> 21558 HDFS Write: 114 FAIL
> Total MapReduce CPU Time Spent: 7 seconds 690 msec
>
> ____________________________________________________________
> __________________________
>
> Thanks !
>

Reply via email to