oops, forgot to reply-all on this thread. ---------- Forwarded message ---------- From: Rick Moritz <rah...@gmail.com> Date: Wed, Aug 19, 2015 at 2:46 PM Subject: Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell To: Igor Berman <igor.ber...@gmail.com>
Those values are not explicitely set, and attempting to read their values results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'. What I mean by the volume per element being larger is illustrated in my original post: for each case the number of elements is identical, but the volume of data required to obtain/manage these elements is many times greater. The only difference used to be that Zeppelin had FAIR scheduling over FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR scheduling makes no difference. The only other difference in the environment lies in some class-path variables which should only affect method availability, not actual usage. Another fact to note: Spark assembly (1.4.0-rc4) was built with provided hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean package) for 2.6.0 from Hortonworks, while Zeppelin was built with dependencies against 2.6.0 from Maven central. On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman <igor.ber...@gmail.com> wrote: > so what your case for version differences? > what do u mean by "in spark-shell the volume per element is much larger" > can you verify that configuration in spark ui (under Environment tab is > same). > if you suspect compression than check following properties: > spark.shuffle.compress > spark.shuffle.spill.compress > spark.io.compression.codec > spark.rdd.compress > > > > On 19 August 2015 at 15:03, Rick Moritz <rah...@gmail.com> wrote: > >> Number of partitions and even size look relatively similar - except in >> spark-shell the volume per element is much larger, especially in later >> stages. That's when shuffles start to spill. Zeppelin creates almost no >> spills at all. The number of elements per partition are the same for both >> setups, but with very different data volume in/out. Almost as though >> compression was used in one case, and not in another, or as though >> shuffling is somehow less specific, and more nodes get data that they >> ultimately don't process at all. The same shuffling algorithm appears to be >> at work in each case, if the partitioning of the number of elements is >> anything to go by. >> >> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman <igor.ber...@gmail.com> >> wrote: >> >>> i would compare spark ui metrics for both cases and see any >>> differences(number of partitions, number of spills etc) >>> why can't you make repl to be consistent with zepellin spark version? >>> might be rc has issues... >>> >>> >>> >>> >>> On 19 August 2015 at 14:42, Rick Moritz <rah...@gmail.com> wrote: >>> >>>> No, the setup is one driver with 32g of memory, and three executors >>>> each with 8g of memory in both cases. No core-number has been specified, >>>> thus it should default to single-core (though I've seen the yarn-owned jvms >>>> wrapping the executors take up to 3 cores in top). That is, unless, as I >>>> suggested, there are different defaults for the two means of job submission >>>> that come into play in a non-transparent fashion (i.e. not visible in >>>> SparkConf). >>>> >>>> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman <igor.ber...@gmail.com> >>>> wrote: >>>> >>>>> any differences in number of cores, memory settings for executors? >>>>> >>>>> >>>>> On 19 August 2015 at 09:49, Rick Moritz <rah...@gmail.com> wrote: >>>>> >>>>>> Dear list, >>>>>> >>>>>> I am observing a very strange difference in behaviour between a Spark >>>>>> 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin >>>>>> interpreter (compiled with Java 6 and sourced from maven central). >>>>>> >>>>>> The workflow loads data from Hive, applies a number of >>>>>> transformations (including quite a lot of shuffle operations) and then >>>>>> presents an enriched dataset. The code (an resulting DAGs) are identical >>>>>> in >>>>>> each case. >>>>>> >>>>>> The following particularities are noted: >>>>>> Importing the HiveRDD and caching it yields identical results on both >>>>>> platforms. >>>>>> Applying case classes, leads to a 2-2.5MB increase in dataset size >>>>>> per partition (excepting empty partitions). >>>>>> >>>>>> Writing shuffles shows this much more significant result: >>>>>> >>>>>> Zeppelin: >>>>>> *Total Time Across All Tasks: * 2,6 min >>>>>> *Input Size / Records: * 2.4 GB / 7314771 >>>>>> *Shuffle Write: * 673.5 MB / 7314771 >>>>>> >>>>>> vs >>>>>> >>>>>> Spark-shell: >>>>>> *Total Time Across All Tasks: * 28 min >>>>>> *Input Size / Records: * 3.6 GB / 7314771 >>>>>> *Shuffle Write: * 9.0 GB / 7314771 >>>>>> >>>>>> This is one of the early stages, which reads from a cached partition >>>>>> and then feeds into a join-stage. The latter stages show similar >>>>>> behaviour >>>>>> in producing excessive shuffle spills. >>>>>> >>>>>> Quite often the excessive shuffle volume will lead to massive shuffle >>>>>> spills which ultimately kill not only performance, but the actual >>>>>> executors >>>>>> as well. >>>>>> >>>>>> I have examined the Environment tab in the SParkUI and identified no >>>>>> notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) >>>>>> scheduling >>>>>> mode. I fail to see how this would impact shuffle writes in such a >>>>>> drastic >>>>>> way, since it should be on the inter-job level, while this happens at the >>>>>> inter-stage level. >>>>>> >>>>>> I was somewhat supicious of maybe compression or serialization >>>>>> playing a role, but the SparkConf points to those being set to the >>>>>> default. >>>>>> Also Zeppelin's interpreter adds no relevant additional default >>>>>> parameters. >>>>>> I performed a diff between rc4 (which was later released) and 1.4.0 >>>>>> and as expected there were no differences, besides a single class >>>>>> (remarkably, a shuffle-relevant class: >>>>>> /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class ) >>>>>> differing in its binary representation due to being compiled with Java 7 >>>>>> instead of Java 6. The decompiled sources of those two are again >>>>>> identical. >>>>>> >>>>>> I may attempt as a next step to simply replace that file in the >>>>>> packaged jar, to ascertain that indeed there is no difference between the >>>>>> two versions, but would consider this to be a major bg, if a simple >>>>>> compiler change leads to this kind of issue. >>>>>> >>>>>> I a also open for any other ideas, in particular to verify that the >>>>>> same compression/serialization is indeed happening, and regarding ways to >>>>>> determin what exactly is written into these shuffles -- currently I only >>>>>> know that the tuples are bigger (or smaller) than they ought to be. The >>>>>> Zeppelin-obtained results do appear to be consistent at least, thus the >>>>>> suspicion is, that there is an issue with the process launched from >>>>>> spark-shell. I will also attempt to build a spark job and spark-submit it >>>>>> using different spark-binaries to further explore the issue. >>>>>> >>>>>> Best Regards, >>>>>> >>>>>> Rick Moritz >>>>>> >>>>>> PS: I already tried to send this mail yesterday, but it never made it >>>>>> onto the list, as far as I can tell -- I apologize should anyone receive >>>>>> this as a second copy. >>>>>> >>>>>> >>>>> >>>> >>> >> >