I think limit repartitions your data into a single partition if called as a non terminal operator. Hence zip works after limit because you only have one partition.
In practice, I have found joins to be much more applicable than zip because of the strict limitation of identical partitions. https://richardstartin.com On 20 Dec 2016, at 16:04, Jack Wenger <jack.wenge...@gmail.com<mailto:jack.wenge...@gmail.com>> wrote: Hello, I'm facing a strange behaviour with Spark 1.5.0 (Cloudera 5.5.1). I'm loading data from Hive with HiveContext (~42M records) and then try to add a new column with "withColumn" and a UDF. Finally i'm suppose to create a new Hive table from this dataframe. Here is the code : _________________________________________________________________ _________________________________________________________________ DATETIME_TO_COMPARE = "9999-12-31 23:59:59.999999" myFunction = udf(lambda col: 0 if col != DATETIME_TO_COMPARE else 1, IntegerType()) df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable WHERE col4 == someValue") df2 = df1.withColumn("myNewCol", myFunction(df1.col3)) df2.registerTempTable("df2") hc.sql("create table my_db.new_table as select * from df2") _________________________________________________________________ _________________________________________________________________ But I get this error : py4j.protocol.Py4JJavaError: An error occurred while calling o36.sql. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, most recent failure: Lost task 18.3 in stage 2.0 (TID 186, lxpbda25.ra1.intra.groupama.fr<http://lxpbda25.ra1.intra.groupama.fr>): org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.hasNext(RDD.scala:832) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org<http://org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org>$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) What is suprising is that if I modify the select statement by addind a LIMIT 100000000 (which is more than twice the number of records in my table), then it's working : _________________________________________________________________ _________________________________________________________________ df1 = hc.sql("SELECT col1, col2, col3,col4,col5,col6,col7 FROM myTable WHERE col4 == someValue" LIMIT 100000000) _________________________________________________________________ _________________________________________________________________ In both cases, if I run a count() on df1, I'm getting the same number : 42 593 052 Is it a bug or am I missing something ? If it is not a bug, what am I doing wrong ? Thank you ! Jack