withColumn gives "Can only zip RDDs with same number of elements in each partition" but not with a LIMIT on the dataframe

2016-12-20 Thread Jack Wenger
Hello,

I'm facing a strange behaviour with Spark 1.5.0 (Cloudera 5.5.1).
I'm loading data from Hive with HiveContext (~42M records) and then try to
add a new column with "withColumn" and a UDF.
Finally i'm suppose to create a new Hive table from this dataframe.


Here is the code :

_
_


DATETIME_TO_COMPARE = "-12-31 23:59:59.99"

myFunction = udf(lambda col: 0 if col != DATETIME_TO_COMPARE else 1,
IntegerType())

df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable
WHERE col4 == someValue")

df2 = df1.withColumn("myNewCol", myFunction(df1.col3))
df2.registerTempTable("df2")

hc.sql("create table my_db.new_table as select * from df2")

_
_


But I get this error :

py4j.protocol.Py4JJavaError: An error occurred while calling o36.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 18 in stage 2.0 failed 4 times, most recent failure: Lost task
18.3 in stage 2.0 (TID 186, lxpbda25.ra1.intra.groupama.fr):
org.apache.spark.SparkException: Can only zip RDDs with same number of
elements in each partition
at 
org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.hasNext(RDD.scala:832)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)





What is suprising is that if I modify the select statement by addind a
LIMIT 1 (which is more than twice the number of records in my
table), then it's working :

_
_

df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable
WHERE col4 == someValue" LIMIT 1)

_
_

In both cases, if I run a count() on df1, I'm getting the same number : 42
593 052

Is it a bug or am I missing something ?
If it is not a bug, what am I doing wrong ?


Thank you !


Jack


ACID transactions on data added from Spark not working

2016-09-14 Thread Jack Wenger
Hi there,

I'm trying to use ACID transactions in Hive but I have a problem when the
data are added with Spark.

First, I created a table with the following statement :

__
CREATE TABLE testdb.test(id string, col1 string)
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC TBLPROPERTIES('transactional'='true');

__

Then I added data with those queries :

__
INSERT INTO testdb.test VALUES("1", "A");
INSERT INTO testdb.test VALUES("2", "B");
INSERT INTO testdb.test VALUES("3", "C");

__

And I've been able to delete rows with this query :

__
DELETE FROM testdb.test WHERE id="1";

__

All that worked perfectly, but a problem occurs when I try to delete rows
that were added with Spark.

What I do in Spark (iPython) :

__
hc = HiveContext(sc)
data = sc.parallelize([["1", "A"], ["2", "B"], ["3", "C"]])
data_df = hc.createDataFrame(data)
data_df.registerTempTable(data_df)
hc.sql("INSERT INTO testdb.test SELECT * FROM data_df");

__

Then, when I come back to Hive, I'm able to run a SELECT query on this the
"test" table.
However, when I try to run the exact same DELETE query as before, I have
the following error (it happens after the reduce phase) :


__

Error: java.lang.RuntimeException:
org.apache.hadoop.hive.ql.metadata.HiveException:
Hive Runtime Error while processing row (tag=0)
{"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"
rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
ucer.java:265)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
upInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime
Error while processing row (tag=0) {"key":{"reducesinkkey0":{"tra
nsactionid":0,"bucketid":-1,"rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
ucer.java:253)
... 7 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(
FileSinkOperator.java:723)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(Sele
ctOperator.java:84)
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecRed
ucer.java:244)
... 7 more


__

I have no idea where this is coming from, that is why I'm looking for
advices on this mailing list.

I'm using the Cloudera Quickstart VM (5.4.2).
Hive version : 1.1.0
Spark Version : 1.3.0

And here is the complete output of the Hive DELETE command :

__

hive> delete from testdb.test where id="1";

Query ID = cloudera_20160914090303_795e40b7-ab6a-45b0-8391-6d41d1cfe7bd
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 4
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapreduce.job.reduces=
Starting Job = job_1473858545651_0036, Tracking URL =
http://quickstart.cloudera:8088/proxy/application_1473858545651_0036/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1473858545651_0036
Hadoop job information for Stage-1: number of mappers: 2; number of
reducers: 4
2016-09-14 09:03:55,571 Stage-1 map = 0%,  reduce = 0%
2016-09-14 09:04:14,898 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
1.66 sec
2016-09-14 09:04:15,944 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
3.33 sec
2016-09-14 09:04:44,101 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
4.21 sec
2016-09-14 09:04:46,523 Stage-1 map = 100%,  reduce = 25%, Cumulative CPU
4.79 sec
2016-09-14 09:04:47,673 Stage-1 map = 100%,  reduce = 42%, Cumulative CPU