Re: Java : Testing RDD aggregateByKey

2021-08-21 Thread Jacek Laskowski
Hi Pedro,

> Anyway, maybe the behavior is weird, I could expect that repartition to
zero was not allowed or at least warned instead of just discarting all the
data .

Interesting...

scala> spark.version
res3: String = 3.1.2

scala> spark.range(5).repartition(0)
java.lang.IllegalArgumentException: requirement failed: Number of
partitions (0) must be positive.
  at scala.Predef$.require(Predef.scala:281)
  at
org.apache.spark.sql.catalyst.plans.logical.Repartition.(basicLogicalOperators.scala:1032)
  at org.apache.spark.sql.Dataset.repartition(Dataset.scala:3016)
  ... 47 elided

How are the above different from yours?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Thu, Aug 19, 2021 at 5:43 PM Pedro Tuero  wrote:

> Hi, I'm sorry , the problem was really silly: In the test the number of
> partitions were zero  (it was a division of the original number of
> partitions of the RDD source and in the test that number was just one) and
> that's why the test was failing.
> Anyway, maybe the behavior is weird, I could expect that repartition to
> zero was not allowed or at least warned instead of just discarting all the
> data .
>
> Thanks for your time!
> Regards,
> Pedro
>
> El jue, 19 de ago. de 2021 a la(s) 07:42, Jacek Laskowski (ja...@japila.pl)
> escribió:
>
>> Hi Pedro,
>>
>> No idea what might be causing it. Do you perhaps have some code to
>> reproduce it locally?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero  wrote:
>>
>>>
>>> Context: spark-core_2.12-3.1.1
>>> Testing with maven and eclipse.
>>>
>>> I'm modifying a project and a test stops working as expected.
>>> The difference is in the parameters passed to the function
>>> aggregateByKey of JavaPairRDD.
>>>
>>> JavaSparkContext is created this way:
>>> new JavaSparkContext(new SparkConf()
>>> .setMaster("local[1]")
>>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
>>> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and
>>> call a method which makes an aggregateByKey over the input JavaPairRDD  and
>>> test that the result is the expected.
>>>
>>> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue,
>>> combiner, merger);
>>>  def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
>>> combFunc: JFunction2[U, U, U]):
>>>   JavaPairRDD[K, U] = {
>>> implicit val ctag: ClassTag[U] = fakeClassTag
>>> fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
>>>   }
>>> The test works as expected.
>>> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
>>> *partitions*,combiner, merger);)
>>> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
>>> JFunction2[U, V, U],
>>>   combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
>>> implicit val ctag: ClassTag[U] = fakeClassTag
>>> fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
>>> combFunc))
>>>   }
>>> The result is always empty. It looks like there is a problem with the
>>> hashPartitioner created at PairRddFunctions :
>>>  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions:
>>> Int)(seqOp: (U, V) => U,
>>>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>> aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
>>> combOp)
>>>   }
>>> vs:
>>>  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>>>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>> aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp)
>>>   }
>>> I can't debug it properly with eclipse, and error occurs when threads
>>> are in spark code (system editor can only open file base resources).
>>>
>>> Does anyone know how to resolve this issue?
>>>
>>> Thanks in advance,
>>> Pedro.
>>>
>>>
>>>
>>>


Re: Is memory-only no-disk Spark possible?

2021-08-21 Thread Jacek Laskowski
Hi Bobby,

What a great summary of what happens behind the scenes! Enjoyed every
sentence!

"The default shuffle implementation will always write out to disk." <--
that's what I wasn't sure about the most. Thanks again!

/me On digging deeper...

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Fri, Aug 20, 2021 at 4:27 PM Bobby Evans  wrote:

> On the data path, Spark will write to a local disk when it runs out of
> memory and needs to spill or when doing a shuffle with the default shuffle
> implementation.  The spilling is a good thing because it lets you process
> data that is too large to fit in memory.  It is not great because the
> processing slows down a lot when that happens, but slow is better than
> crashing in many cases. The default shuffle implementation will
> always write out to disk.  This again is good in that it allows you to
> process more data on a single box than can fit in memory. It is bad when
> the shuffle data could fit in memory, but ends up being written to disk
> anyways.  On Linux the data is being written into the page cache and will
> be flushed to disk in the background when memory is needed or after a set
> amount of time. If your query is fast and is shuffling little data, then it
> is likely that your query is running all in memory.  All of the shuffle
> reads and writes are probably going directly to the page cache and the disk
> is not involved at all. If you really want to you can configure the
> pagecache to not spill to disk until absolutely necessary. That should get
> you really close to pure in-memory processing, so long as you have enough
> free memory on the host to support it.
>
> Bobby
>
>
>
> On Fri, Aug 20, 2021 at 7:57 AM Mich Talebzadeh 
> wrote:
>
>> Well I don't know what having an "in-memory Spark only" is going to
>> achieve. Spark GUI shows the amount of disk usage pretty well. The memory
>> is used exclusively by default first.
>>
>> Spark is no different from a predominantly in-memory application.
>> Effectively it is doing the classical disk based hadoop  map-reduce
>> operation "in memory" to speed up the processing but it is still an
>> application on top of the OS.  So like mose applications, there is a state
>> of Spark, the code running and the OS(s), where disk usage will be needed.
>>
>> This is akin to swap space on OS itself and I quote "Swap space is used when
>> your operating system decides that it needs physical memory for active
>> processes and the amount of available (unused) physical memory is
>> insufficient. When this happens, inactive pages from the physical memory
>> are then moved into the swap space, freeing up that physical memory for
>> other uses"
>>
>>  free
>>   totalusedfree  shared  buff/cache
>>  available
>> Mem:   6565973230116700 1429436 234177234113596
>> 32665372
>> Swap: 104857596  550912   104306684
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 20 Aug 2021 at 12:50, Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> I've been exploring BlockManager and the stores for a while now and am
>>> tempted to say that a memory-only Spark setup would be possible (except
>>> shuffle blocks). Is this correct?
>>>
>>> What about shuffle blocks? Do they have to be stored on disk (in
>>> DiskStore)?
>>>
>>> I think broadcast variables are in-memory first so except on-disk
>>> storage level explicitly used (by Spark devs), there's no reason not to
>>> have Spark in-memory only.
>>>
>>> (I was told that one of the differences between Trino/Presto vs Spark
>>> SQL is that Trino keeps all processing in-memory only and will blow up
>>> while Spark uses disk to avoid OOMEs).
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books 
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> 
>>>
>>