oops, forgot to reply-all on this thread.
---------- Forwarded message ----------
From: Rick Moritz <rah...@gmail.com>
Date: Wed, Aug 19, 2015 at 2:46 PM
Subject: Re: Strange shuffle behaviour difference between Zeppelin and
Spark-shell
To: Igor Berman <igor.ber...@gmail.com>


Those values are not explicitely set, and attempting to read their values
results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'.
What I mean by the volume per element being larger is illustrated in my
original post: for each case the number of elements is identical, but the
volume of data required to obtain/manage these elements is many times
greater.

The only difference used to be that Zeppelin had FAIR scheduling over FIFO
scheduling for spark-shell. I just verified that spark-shell with FAIR
scheduling makes no difference. The only other difference in the
environment lies in some class-path variables which should only affect
method availability, not actual usage.

Another fact to note: Spark assembly (1.4.0-rc4) was built with provided
hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0
-Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean
package) for 2.6.0 from Hortonworks, while Zeppelin was built with
dependencies against 2.6.0 from Maven central.

On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman <igor.ber...@gmail.com> wrote:

> so what your case for version differences?
> what do u mean by  "in spark-shell the volume per element is much larger"
> can you verify that configuration in spark ui (under Environment tab is
> same).
> if you suspect compression than check following properties:
> spark.shuffle.compress
> spark.shuffle.spill.compress
> spark.io.compression.codec
> spark.rdd.compress
>
>
>
> On 19 August 2015 at 15:03, Rick Moritz <rah...@gmail.com> wrote:
>
>> Number of partitions and even size look relatively similar - except in
>> spark-shell the volume per element is much larger, especially in later
>> stages. That's when shuffles start to spill. Zeppelin creates almost no
>> spills at all. The number of elements per partition are the same for both
>> setups, but with very different data volume in/out. Almost as though
>> compression was used in one case, and not in another, or as though
>> shuffling is somehow less specific, and more nodes get data that they
>> ultimately don't process at all. The same shuffling algorithm appears to be
>> at work in each case, if the partitioning of the number of elements is
>> anything to go by.
>>
>> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman <igor.ber...@gmail.com>
>> wrote:
>>
>>> i would compare spark ui metrics for both cases and see any
>>> differences(number of partitions, number of spills etc)
>>> why can't you make repl to be consistent with zepellin spark version?
>>>  might be rc has issues...
>>>
>>>
>>>
>>>
>>> On 19 August 2015 at 14:42, Rick Moritz <rah...@gmail.com> wrote:
>>>
>>>> No, the setup is one driver with 32g of memory, and three executors
>>>> each with 8g of memory in both cases. No core-number has been specified,
>>>> thus it should default to single-core (though I've seen the yarn-owned jvms
>>>> wrapping the executors take up to 3 cores in top). That is, unless, as I
>>>> suggested, there are different defaults for the two means of job submission
>>>> that come into play in a non-transparent fashion (i.e. not visible in
>>>> SparkConf).
>>>>
>>>> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman <igor.ber...@gmail.com>
>>>> wrote:
>>>>
>>>>> any differences in number of cores, memory settings for executors?
>>>>>
>>>>>
>>>>> On 19 August 2015 at 09:49, Rick Moritz <rah...@gmail.com> wrote:
>>>>>
>>>>>> Dear list,
>>>>>>
>>>>>> I am observing a very strange difference in behaviour between a Spark
>>>>>> 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin
>>>>>> interpreter (compiled with Java 6 and sourced from maven central).
>>>>>>
>>>>>> The workflow loads data from Hive, applies a number of
>>>>>> transformations (including quite a lot of shuffle operations) and then
>>>>>> presents an enriched dataset. The code (an resulting DAGs) are identical 
>>>>>> in
>>>>>> each case.
>>>>>>
>>>>>> The following particularities are noted:
>>>>>> Importing the HiveRDD and caching it yields identical results on both
>>>>>> platforms.
>>>>>> Applying case classes, leads to a 2-2.5MB increase in dataset size
>>>>>> per partition (excepting empty partitions).
>>>>>>
>>>>>> Writing shuffles shows this much more significant result:
>>>>>>
>>>>>> Zeppelin:
>>>>>> *Total Time Across All Tasks: * 2,6 min
>>>>>> *Input Size / Records: * 2.4 GB / 7314771
>>>>>> *Shuffle Write: * 673.5 MB / 7314771
>>>>>>
>>>>>> vs
>>>>>>
>>>>>> Spark-shell:
>>>>>> *Total Time Across All Tasks: * 28 min
>>>>>> *Input Size / Records: * 3.6 GB / 7314771
>>>>>> *Shuffle Write: * 9.0 GB / 7314771
>>>>>>
>>>>>> This is one of the early stages, which reads from a cached partition
>>>>>> and then feeds into a join-stage. The latter stages show similar 
>>>>>> behaviour
>>>>>> in producing excessive shuffle spills.
>>>>>>
>>>>>> Quite often the excessive shuffle volume will lead to massive shuffle
>>>>>> spills which ultimately kill not only performance, but the actual 
>>>>>> executors
>>>>>> as well.
>>>>>>
>>>>>> I have examined the Environment tab in the SParkUI and identified no
>>>>>> notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) 
>>>>>> scheduling
>>>>>> mode. I fail to see how this would impact shuffle writes in such a 
>>>>>> drastic
>>>>>> way, since it should be on the inter-job level, while this happens at the
>>>>>> inter-stage level.
>>>>>>
>>>>>> I was somewhat supicious of maybe compression or serialization
>>>>>> playing a role, but the SparkConf points to those being set to the 
>>>>>> default.
>>>>>> Also Zeppelin's interpreter adds no relevant additional default 
>>>>>> parameters.
>>>>>> I performed a diff between rc4 (which was later released) and 1.4.0
>>>>>> and as expected there were no differences, besides a single class
>>>>>> (remarkably, a shuffle-relevant class:
>>>>>> /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
>>>>>> differing in its binary representation due to being compiled with Java 7
>>>>>> instead of Java 6. The decompiled sources of those two are again 
>>>>>> identical.
>>>>>>
>>>>>> I may attempt as a next step to simply replace that file in the
>>>>>> packaged jar, to ascertain that indeed there is no difference between the
>>>>>> two versions, but would consider this to be a major bg, if a simple
>>>>>> compiler change leads to this kind of issue.
>>>>>>
>>>>>> I a also open for any other ideas, in particular to verify that the
>>>>>> same compression/serialization is indeed happening, and regarding ways to
>>>>>> determin what exactly is written into these shuffles -- currently I only
>>>>>> know that the tuples are bigger (or smaller) than they ought to be. The
>>>>>> Zeppelin-obtained results do appear to be consistent at least, thus the
>>>>>> suspicion is, that there is an issue with the process launched from
>>>>>> spark-shell. I will also attempt to build a spark job and spark-submit it
>>>>>> using different spark-binaries to further explore the issue.
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Rick Moritz
>>>>>>
>>>>>> PS: I already tried to send this mail yesterday, but it never made it
>>>>>> onto the list, as far as I can tell -- I apologize should anyone receive
>>>>>> this as a second copy.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to