This should be fixed by https://github.com/apache/spark/commit/a367840834b97cd6a9ecda568bb21ee6dc35fcde
Will be released as 1.5.2 soon. On Mon, Oct 19, 2015 at 9:04 AM, peay2 <peay...@yahoo.fr> wrote: > Hi, > > I am getting some very strange results, where I get different results based > on whether or not I call persist() on a data frame or not before > materialising it. > > There's probably something obvious I am missing, as only very simple > operations are involved here. Any help with this would be greatly > appreciated. I have a simple data-frame with IDs and values: > > data_dict = {'id': {k: str(k) for k in range(99)}, 'value': > dict(enumerate(['A'] * 4 + ['B'] * 46 + ['C'] * 49))} > df_small = pd.DataFrame(data_dict) > records = sqlContext.createDataFrame(df_small) > records.printSchema() > > # root > # |-- id: string (nullable = true) > # |-- value: string (nullable = true) > > Now, I left outer join over the IDs -- here, using a dummy constant column > on the right instead of a separate data-frame (enough to reproduce my > issue): > > unique_ids = records.select("id").dropDuplicates() > id_names = unique_ids.select(F.col("id").alias("id_join"), > F.lit("xxx").alias("id_name")) > > df_joined = records.join(id_names, records['id'] == id_names['id_join'], > "left_outer").drop("id_join") > > At this point, *doing a show on df_joined* indicates all is fine: all > records are there as expected, for instance: > > df_joined[(df_joined['id'] > 60) & (df_joined['id'] < 70)].show() > +---+-----+-------+ > | id|value|id_name| > +---+-----+-------+ > | 61| C| xxx| > | 62| C| xxx| > | 63| C| xxx| > | 64| C| xxx| > ... > > However, if I filter for a given value and then group by ID, I do not get > back all of the groups: > > def print_unique_ids(df): > filtered = df[df["value"] == "C"] > plan = filtered.groupBy("id").count().select("id") > unique_ids = list(plan.toPandas()["id"]) > > print "{0} IDs: {1}\n".format(len(unique_ids), sorted(unique_ids)) > print plan.rdd.toDebugString() + "\n" > > print_unique_ids(df_joined.unpersist()) > print_unique_ids(df_joined.persist()) > > 49 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58', > u'59', u'60', u'61', u'62', u'63', u'64', u'65', u'66', u'67', u'68', u'69', > u'70', u'71', u'72', u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80', > u'81', u'82', u'83', u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91', > u'92', u'93', u'94', u'95', u'96', u'97', u'98'] > > 46 IDs: [u'50', u'51', u'52', u'53', u'54', u'55', u'56', u'57', u'58', > u'59', u'60', u'61', u'62', u'66', u'67', u'68', u'69', u'70', u'71', u'72', > u'73', u'74', u'75', u'76', u'77', u'78', u'79', u'80', u'81', u'82', u'83', > u'84', u'85', u'86', u'87', u'88', u'89', u'90', u'91', u'92', u'93', u'94', > u'95', u'96', u'97', u'98'] > > Note how here IDs 43, 44, 45 are missing when persist() has been called. The > output is correct if the data-frame has not been marked for persistance, but > incorrect after the call to persist. > > When persist() has been called, Tungsten seems to be involved, but not if > the data-frame has not been persisted. I am including the full outputs of > toDebugString below. > > Has anyone any idea what is going on here? > > In case this helps: I see no issue if I don't do the dummy join, or if I > don't filter for value == "C". I have a default spark config, besides > "spark.shuffle.consolidateFiles=true", and spark 1.5.1. > > Thanks a lot! > > - Without persist: > > (200) MapPartitionsRDD[26] at javaToPython at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[25] at javaToPython at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsWithPreparationRDD[22] at toPandas at > <ipython-input-2-xxx>:25 [] > | MapPartitionsWithPreparationRDD[21] at toPandas at > <ipython-input-2-xxx>:25 [] > | MapPartitionsRDD[20] at toPandas at <ipython-input-2-xxx>:25 [] > | ZippedPartitionsRDD2[19] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsWithPreparationRDD[9] at toPandas at > <ipython-input-2-xxx>:25 [] > | ShuffledRowRDD[8] at toPandas at <ipython-input-2-xxx>:25 [] > +-(2) MapPartitionsRDD[7] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsRDD[6] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsRDD[5] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsRDD[4] at applySchemaToPythonRDD at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[3] at map at SerDeUtil.scala:100 [] > | MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 [] > | PythonRDD[1] at RDD at PythonRDD.scala:43 [] > | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 [] > | MapPartitionsWithPreparationRDD[18] at toPandas at > <ipython-input-2-xxx>:25 [] > | ShuffledRowRDD[17] at toPandas at <ipython-input-2-xxx>:25 [] > +-(200) MapPartitionsRDD[16] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsRDD[15] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsWithPreparationRDD[14] at toPandas at > <ipython-input-2-xxx>:25 [] > | ShuffledRowRDD[13] at toPandas at <ipython-input-2-xxx>:25 [] > +-(2) MapPartitionsRDD[12] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsWithPreparationRDD[11] at toPandas at > <ipython-input-2-xxx>:25 [] > | MapPartitionsRDD[10] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsRDD[4] at applySchemaToPythonRDD at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[3] at map at SerDeUtil.scala:100 [] > | MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 [] > | PythonRDD[1] at RDD at PythonRDD.scala:43 [] > | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 > [] > > - With persist: > > (200) MapPartitionsRDD[52] at javaToPython at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[51] at javaToPython at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsWithPreparationRDD[48] at toPandas at > <ipython-input-2-xxx>:25 [] > | ShuffledRowRDD[47] at toPandas at <ipython-input-2-xxx>:25 [] > +-(200) MapPartitionsRDD[46] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsWithPreparationRDD[45] at toPandas at > <ipython-input-2-xxx>:25 [] > | MapPartitionsRDD[44] at toPandas at <ipython-input-2-xxx>:25 [] > | MapPartitionsRDD[43] at toPandas at <ipython-input-2-xxx>:25 [] > | TungstenProject [id#0,value#1,id_name#3] > SortMergeOuterJoin [id#0], [id_join#2], LeftOuter, None > TungstenSort [id#0 ASC], false, 0 > TungstenExchange hashpartitioning(id#0) > ConvertToUnsafe > Scan PhysicalRDD[id#0,value#1] > TungstenSort [id_join#2 ASC], false, 0 > TungstenExchange hashpartitioning(id_join#2) > TungstenProject [id#0 AS id_join#2,xxx AS id_name#3] > TungstenAggregate(key=[id#0], functions=[], output=[id#0]) > TungstenExchange hashpartitioning(id#0) > TungstenAggregate(key=[id#0], functions=[], output=[id#0]) > TungstenProject [id#0] > Scan PhysicalRDD[id#0,value#1] > MapPartitionsRDD[42] at persist at NativeMethodAccessorImpl.java:-2 [] > | CachedPartitions: 200; MemorySize: 54.0 KB; > ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B > | MapPartitionsRDD[41] at persist at > NativeMethodAccessorImpl.java:-2 [] > | ZippedPartitionsRDD2[40] at persist at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsWithPreparationRDD[30] at persist at > NativeMethodAccessorImpl.java:-2 [] > | ShuffledRowRDD[29] at persist at NativeMethodAccessorImpl.java:-2 > [] > +-(2) MapPartitionsRDD[28] at persist at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[27] at persist at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[4] at applySchemaToPythonRDD at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[3] at map at SerDeUtil.scala:100 [] > | MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 [] > | PythonRDD[1] at RDD at PythonRDD.scala:43 [] > | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 > [] > | MapPartitionsWithPreparationRDD[39] at persist at > NativeMethodAccessorImpl.java:-2 [] > | ShuffledRowRDD[38] at persist at NativeMethodAccessorImpl.java:-2 > [] > +-(200) MapPartitionsRDD[37] at persist at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[36] at persist at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsWithPreparationRDD[35] at persist at > NativeMethodAccessorImpl.java:-2 [] > | ShuffledRowRDD[34] at persist at > NativeMethodAccessorImpl.java:-2 [] > +-(2) MapPartitionsRDD[33] at persist at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsWithPreparationRDD[32] at persist at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[31] at persist at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[4] at applySchemaToPythonRDD at > NativeMethodAccessorImpl.java:-2 [] > | MapPartitionsRDD[3] at map at SerDeUtil.scala:100 [] > | MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:147 > [] > | PythonRDD[1] at RDD at PythonRDD.scala:43 [] > | ParallelCollectionRDD[0] at parallelize at > PythonRDD.scala:423 [] > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-results-differ-based-on-whether-persist-has-been-called-tp25121.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org