Re: Spark on Kubernetes scheduler variety

2021-06-23 Thread Klaus Ma
Hi team,

I'm kube-batch/Volcano founder, and I'm excited to hear that the spark
community also has such requirements :)

Volcano provides several features for batch workload, e.g. fair-share,
queue, reservation, preemption/reclaim and so on.
It has been used in several product environments with Spark; if necessary,
I can give an overall introduction about Volcano's features and those use
cases :)

-- Klaus

On Wed, Jun 23, 2021 at 11:26 PM Mich Talebzadeh 
wrote:

>
>
> Please allow me to be diverse and express a different point of view on
> this roadmap.
>
>
> I believe from a technical point of view spending time and effort plus
> talent on batch scheduling on Kubernetes could be rewarding. However, if I
> may say I doubt whether such an approach and the so-called democratization
> of Spark on whatever platform is really should be of great focus.
>
> Having worked on Google Dataproc  (A fully
> managed and highly scalable service for running Apache Spark, Hadoop and
> more recently other artefacts) for that past two years, and Spark on
> Kubernetes on-premise, I have come to the conclusion that Spark is not a
> beast that that one can fully commoditize it much like one can do with
> Zookeeper, Kafka etc. There is always a struggle to make some niche areas
> of Spark like Spark Structured Streaming (SSS) work seamlessly and
> effortlessly on these commercial platforms with whatever as a Service.
>
>
> Moreover, Spark (and I stand corrected) from the ground up has already a
> lot of resiliency and redundancy built in. It is truly an enterprise class
> product (requires enterprise class support) that will be difficult to
> commoditize with Kubernetes and expect the same performance. After all,
> Kubernetes is aimed at efficient resource sharing and potential cost saving
> for the mass market. In short I can see commercial enterprises will work on
> these platforms ,but may be the great talents on dev team should focus on
> stuff like the perceived limitation of SSS in dealing with chain of
> aggregation( if I am correct it is not yet supported on streaming datasets)
>
>
> These are my opinions and they are not facts, just opinions so to speak :)
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 18 Jun 2021 at 23:18, Holden Karau  wrote:
>
>> I think these approaches are good, but there are limitations (eg dynamic
>> scaling) without us making changes inside of the Spark Kube scheduler.
>>
>> Certainly whichever scheduler extensions we add support for we should
>> collaborate with the people developing those extensions insofar as they are
>> interested. My first place that I checked was #sig-scheduling which is
>> fairly quite on the Kubernetes slack but if there are more places to look
>> for folks interested in batch scheduling on Kubernetes we should definitely
>> give it a shot :)
>>
>> On Fri, Jun 18, 2021 at 1:41 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Regarding your point and I quote
>>>
>>> "..  I know that one of the Spark on Kube operators
>>> supports volcano/kube-batch so I was thinking that might be a place I would
>>> start exploring..."
>>>
>>> There seems to be ongoing work on say Volcano as part of  Cloud Native
>>> Computing Foundation  (CNCF). For example through
>>> https://github.com/volcano-sh/volcano
>>>
>> 
>>>
>>> There may be value-add in collaborating with such groups through CNCF in
>>> order to have a collective approach to such work. There also seems to be
>>> some work on Integration of Spark with Volcano for Batch Scheduling.
>>> 
>>>
>>>
>>>
>>> What is not very clear is the degree of progress of these projects. You
>>> may be kind enough to elaborate on KPI for each of these projects and where
>>> you think your contributions is going to be.
>>>
>>>
>>> HTH,
>>>
>>>
>>> Mich
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 18 Jun 2021 at 00:44, Holden Karau  wrote:
>>>
 Hi Folks,

 I'm continuing 

Re: Performance Problems Migrating to S3A Committers

2021-06-23 Thread Artemis User
Thanks Johnny for sharing your experience.  Have you tried to use S3A 
committer?  Looks like this one is introduced in the latest Hadoop for 
solving problems with other committers.


https://hadoop.apache.org/docs/r3.1.1/hadoop-aws/tools/hadoop-aws/committers.html

- ND

On 6/22/21 6:41 PM, Johnny Burns wrote:

Hello.

I’m Johnny, I work at Stripe. We’re heavy Spark users and we’ve been 
exploring using s3 committers.Currently we first write the data to 
HDFS and then upload it to S3. However, now with S3 offering strong 
consistency guarantees, we are evaluating if we can write data 
directly to S3.


We’re having some troubles with performance, so hoping someone might 
have some guidance which can unblock this.



File Format

We are using parquetas the File Format. We do have icebergtables as 
well, and they are indeed able to commit directly to S3(withminimal 
local disk usage). We can’t migrate all of our jobs to icebergright 
now. Hence, we are looking for a committer that is performant and can 
directly write parquetfiles to S3(withminimal local disk usage).



What have we tried?

We’ve tried using both the“magic”and“directory”committers. We're 
setting the following configs (in addition to the "magic/directory" 
committer.name ).


"spark.hadoop.fs.s3a.committer.magic.enabled":"true",


"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",

"spark.sql.sources.commitProtocolClass":"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",

"spark.sql.parquet.output.committer.class":"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",


Both committers have shown performance regressions on large jobs. 
We’re currently focused on trying to make the directory committer work 
because we’ve seen /fewer/slowdowns with that one, but I’ll describe 
the problems with each.


We’ve been testing the committers on a large job with 100k 
tasks(creating7.3TB of output).



Observationsfor magic committer


Using the magic committer, we see slowdowns in two places:

  * *S3 Writing(inside**the task)*

  * The slowdown seems to occur just after the s3 multipart write. The
finishedWrite

function
tries to do some cleanup and kicks off
thisdeleteUnnecessaryFakeDirectories

function

.


  * This causes 503’s due to hitting AWS rate limits on
com.amazonaws.services.s3.model.DeleteObjectsRequest

  * I'm not sure what directories are actually getting cleaned up here
(I assume the _magic directories are still needed up until the job
commit).

  * *Job Commit*

  * Have not dug down into the details here, but assume it is
something similar to what we’re seeing in the directory committer
case below.


Observationsfor directory committer


We’ve observed that the“directory”s3committer performance is on-par 
with our existing HDFS commit for task execution and task commit. The 
slowdowns we’re seeing are in the job commit phase.


The job commit happens almost instantaneously in the HDFS case, vs 
taking about an hour for the s3 directory committer.


We’ve enabled DEBUG logging for the s3 committer. It seems like that 
hour is mostly spent doing things which you would 
expect(completing100k delayedComplete s3 uploads). I've attached an 
example of some of the logs we see repeated over-and-over during the 1 
hour job commit (I redacted some of the directories and SHAs but the 
logs are otherwise unchanged).


One thing I notice is that we see object_delete_requests += 1in the 
logs. I’m not sure if that means it’s doing an s3 delete, or it is 
deleting the HDFS manifest files(toclean up the task).



Alternatives - Should we check out directCommitter?

We’ve also considered using the directCommitter. We understand that 
the directCommitter is discouraged because it does not support 
speculative execution(andfor some failure cases). Given that we do not 
use speculative execution at Stripe, would the directCommitter be a 
viable option for us? What are the failure scenarios to consider?



Alternatives - Can S3FileIO work well with parquet files?


Netflix has a tool called s3FileIO 
. We’re wondering if it can 
be used with spark, or only with Iceburg.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
Looks like repartitioning was my friend, seems to be distributed across the
cluster now.

All good. Thanks!


On Wed, Jun 23, 2021 at 2:18 PM Tom Barber  wrote:

> Okay so I tried another idea which was to use a real simple class to drive
> a mapPartitions... because logic in my head seems to suggest that I want to
> map my partitions...
>
> @SerialVersionUID(100L)
> class RunCrawl extends Serializable{
>   def mapCrawl(x: Iterator[(String, Iterable[Resource])], job:
> SparklerJob): Iterator[CrawlData] = {
> val m = 1000
> x.flatMap({case (grp, rs) => new FairFetcher(job, rs.iterator, m,
>   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer)})
>   }
>
>   def runCrawl(f: RDD[(String, Iterable[Resource])], job: SparklerJob):
> RDD[CrawlData] = {
> f.mapPartitions( x => mapCrawl(x, job))
>
>   }
>
> }
>
> That is what it looks like. But the task execution window in the cluster
> looks the same:
>
> https://pasteboard.co/K7WrBnV.png
>
> 1 task on a single node.
>
> I feel like I'm missing something obvious here about either
>
> a) how spark works
> b) how it divides up partitions to tasks
> c) the fact its a POJO and not a file of stuff.
>
> Or probably some of all 3.
>
> Tom
>
> On Wed, Jun 23, 2021 at 11:44 AM Tom Barber  wrote:
>
>> (I should point out that I'm diagnosing this by looking at the active
>> tasks https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly,
>> let me know)
>>
>> On Wed, Jun 23, 2021 at 11:38 AM Tom Barber  wrote:
>>
>>> Uff hello fine people.
>>>
>>> So the cause of the above issue was, unsurprisingly, human error. I
>>> found a local[*] spark master config which gazumped my own one so
>>> mystery solved. But I have another question, that is still the crux of this
>>> problem:
>>>
>>> Here's a bit of trimmed code, that I'm currently testing with. I
>>> deliberately stuck in a repartition(50), just to force it to, what I
>>> believe was chunk it up and distribute it. Which is all good.
>>>
>>> override def run(): Unit = {
>>> ...
>>>
>>> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
>>> val f = rdd.map(r => (r.getGroup, r))
>>>   .groupByKey().repartition(50);
>>>
>>> val c = f.getNumPartitions
>>>   val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, 
>>> rs.iterator, localFetchDelay,
>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer) })
>>>   .persist()
>>>
>>> val d = fetchedRdd.getNumPartitions
>>>
>>> ...
>>>
>>> val scoredRdd = score(fetchedRdd)
>>>
>>> ...
>>>
>>> }
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>>> ScoreUpdateSolrTransformer(d))
>>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>>
>>> }
>>>
>>>
>>> Basically for anyone new to this, the business logic lives inside the
>>> FairFetcher and I need that distributed over all the nodes in spark cluster.
>>>
>>> Here's a quick illustration of what I'm seeing:
>>> https://pasteboard.co/K7VovBO.png
>>>
>>> It chunks up to code and distributes the tasks across the cluster, but
>>> that occurs _prior_ to the business logic  in the FlatMap being executed.
>>>
>>> So specifically, has anyone got any ideas about how to split that
>>> flatmap operation up so the RDD processing runs across the nodes, not
>>> limited to a single node?
>>>
>>> Thanks for all your help so far,
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:
>>>
 Ah no sorry, so in the load image, the crawl has just kicked off on the
 driver node which is why its flagged red and the load is spiking.
 https://pasteboard.co/K5QHOJN.png here's the cluster now its been
 running a while. The red node is still (and is always every time I tested
 it) the driver node.

 Tom



 On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:

> Where do you see that ... I see 3 executors busy at first. If that's
> the crawl then ?
>
> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>
>> Yeah :)
>>
>> But it's all running through the same node. So I can run multiple
>> tasks of the same type on the same node(the driver), but I can't run
>> multiple tasks on multiple nodes.
>>
>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>>
>>> Wait. Isn't that what you were trying to parallelize in the first
>>> place?
>>>
>>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>>
 Yeah but that something else is the crawl being run, which is
 triggered from inside the RDDs, because the log output is slowly 
 outputting
 crawl data.


>> Spicule 

Re: Parquet Metadata

2021-06-23 Thread Sam
Hi, I only know about comments which you can add to each column where you
can add these key values.

Thanks.

On Wed, Jun 23, 2021 at 11:31 AM Bode, Meikel, NMA-CFD <
meikel.b...@bertelsmann.de> wrote:

> Hi folks,
>
>
>
> Maybe not the right audience but maybe you came along such an requirement.
>
> Is it possible to define a parquet schema, that contains technical column
> names and a list of translations for a certain column name into different
> languages?
>
>
>
> I give an example:
>
> Technical: “custnr” would translate to { EN:”Customer Number”,  DE:
> “Kundennummer”}
>
>
>
> We could of course deliver a meta data file containing such language
> mappings, but our questions is whether we could embed that info into the
> parquet meta data?
>
>
>
> Thanks a lot,
>
> Meikel
>
>
>


Parquet Metadata

2021-06-23 Thread Bode, Meikel, NMA-CFD
Hi folks,

Maybe not the right audience but maybe you came along such an requirement.
Is it possible to define a parquet schema, that contains technical column names 
and a list of translations for a certain column name into different languages?

I give an example:
Technical: "custnr" would translate to { EN:"Customer Number",  DE: 
"Kundennummer"}

We could of course deliver a meta data file containing such language mappings, 
but our questions is whether we could embed that info into the parquet meta 
data?

Thanks a lot,
Meikel



Re: Spark on Kubernetes scheduler variety

2021-06-23 Thread Mich Talebzadeh
Please allow me to be diverse and express a different point of view on
this roadmap.


I believe from a technical point of view spending time and effort plus
talent on batch scheduling on Kubernetes could be rewarding. However, if I
may say I doubt whether such an approach and the so-called democratization
of Spark on whatever platform is really should be of great focus.

Having worked on Google Dataproc  (A fully
managed and highly scalable service for running Apache Spark, Hadoop and
more recently other artefacts) for that past two years, and Spark on
Kubernetes on-premise, I have come to the conclusion that Spark is not a
beast that that one can fully commoditize it much like one can do with
Zookeeper, Kafka etc. There is always a struggle to make some niche areas
of Spark like Spark Structured Streaming (SSS) work seamlessly and
effortlessly on these commercial platforms with whatever as a Service.


Moreover, Spark (and I stand corrected) from the ground up has already a
lot of resiliency and redundancy built in. It is truly an enterprise class
product (requires enterprise class support) that will be difficult to
commoditize with Kubernetes and expect the same performance. After all,
Kubernetes is aimed at efficient resource sharing and potential cost saving
for the mass market. In short I can see commercial enterprises will work on
these platforms ,but may be the great talents on dev team should focus on
stuff like the perceived limitation of SSS in dealing with chain of
aggregation( if I am correct it is not yet supported on streaming datasets)


These are my opinions and they are not facts, just opinions so to speak :)


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 18 Jun 2021 at 23:18, Holden Karau  wrote:

> I think these approaches are good, but there are limitations (eg dynamic
> scaling) without us making changes inside of the Spark Kube scheduler.
>
> Certainly whichever scheduler extensions we add support for we should
> collaborate with the people developing those extensions insofar as they are
> interested. My first place that I checked was #sig-scheduling which is
> fairly quite on the Kubernetes slack but if there are more places to look
> for folks interested in batch scheduling on Kubernetes we should definitely
> give it a shot :)
>
> On Fri, Jun 18, 2021 at 1:41 AM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> Regarding your point and I quote
>>
>> "..  I know that one of the Spark on Kube operators
>> supports volcano/kube-batch so I was thinking that might be a place I would
>> start exploring..."
>>
>> There seems to be ongoing work on say Volcano as part of  Cloud Native
>> Computing Foundation  (CNCF). For example through
>> https://github.com/volcano-sh/volcano
>>
> 
>>
>> There may be value-add in collaborating with such groups through CNCF in
>> order to have a collective approach to such work. There also seems to be
>> some work on Integration of Spark with Volcano for Batch Scheduling.
>> 
>>
>>
>>
>> What is not very clear is the degree of progress of these projects. You
>> may be kind enough to elaborate on KPI for each of these projects and where
>> you think your contributions is going to be.
>>
>>
>> HTH,
>>
>>
>> Mich
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 18 Jun 2021 at 00:44, Holden Karau  wrote:
>>
>>> Hi Folks,
>>>
>>> I'm continuing my adventures to make Spark on containers party and I
>>> was wondering if folks have experience with the different batch
>>> scheduler options that they prefer? I was thinking so that we can
>>> better support dynamic allocation it might make sense for us to
>>> support using different schedulers and I wanted to see if there are
>>> any that the community is more interested in?
>>>
>>> I know that one of the Spark on Kube operators supports
>>> volcano/kube-batch so I was thinking that might be a place I start
>>> exploring but also want to be open to other schedulers that folks
>>> might be interested in.
>>>
>>> Cheers,
>>>
>>> Holden :)

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
Okay so I tried another idea which was to use a real simple class to drive
a mapPartitions... because logic in my head seems to suggest that I want to
map my partitions...

@SerialVersionUID(100L)
class RunCrawl extends Serializable{
  def mapCrawl(x: Iterator[(String, Iterable[Resource])], job:
SparklerJob): Iterator[CrawlData] = {
val m = 1000
x.flatMap({case (grp, rs) => new FairFetcher(job, rs.iterator, m,
  FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer)})
  }

  def runCrawl(f: RDD[(String, Iterable[Resource])], job: SparklerJob):
RDD[CrawlData] = {
f.mapPartitions( x => mapCrawl(x, job))

  }

}

That is what it looks like. But the task execution window in the cluster
looks the same:

https://pasteboard.co/K7WrBnV.png

1 task on a single node.

I feel like I'm missing something obvious here about either

a) how spark works
b) how it divides up partitions to tasks
c) the fact its a POJO and not a file of stuff.

Or probably some of all 3.

Tom

On Wed, Jun 23, 2021 at 11:44 AM Tom Barber  wrote:

> (I should point out that I'm diagnosing this by looking at the active
> tasks https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly,
> let me know)
>
> On Wed, Jun 23, 2021 at 11:38 AM Tom Barber  wrote:
>
>> Uff hello fine people.
>>
>> So the cause of the above issue was, unsurprisingly, human error. I found
>> a local[*] spark master config which gazumped my own one so mystery
>> solved. But I have another question, that is still the crux of this problem:
>>
>> Here's a bit of trimmed code, that I'm currently testing with. I
>> deliberately stuck in a repartition(50), just to force it to, what I
>> believe was chunk it up and distribute it. Which is all good.
>>
>> override def run(): Unit = {
>> ...
>>
>> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
>> val f = rdd.map(r => (r.getGroup, r))
>>   .groupByKey().repartition(50);
>>
>> val c = f.getNumPartitions
>>   val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer) })
>>   .persist()
>>
>> val d = fetchedRdd.getNumPartitions
>>
>> ...
>>
>> val scoredRdd = score(fetchedRdd)
>>
>> ...
>>
>> }
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>> ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>
>> }
>>
>>
>> Basically for anyone new to this, the business logic lives inside the
>> FairFetcher and I need that distributed over all the nodes in spark cluster.
>>
>> Here's a quick illustration of what I'm seeing:
>> https://pasteboard.co/K7VovBO.png
>>
>> It chunks up to code and distributes the tasks across the cluster, but
>> that occurs _prior_ to the business logic  in the FlatMap being executed.
>>
>> So specifically, has anyone got any ideas about how to split that flatmap
>> operation up so the RDD processing runs across the nodes, not limited to a
>> single node?
>>
>> Thanks for all your help so far,
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:
>>
>>> Ah no sorry, so in the load image, the crawl has just kicked off on the
>>> driver node which is why its flagged red and the load is spiking.
>>> https://pasteboard.co/K5QHOJN.png here's the cluster now its been
>>> running a while. The red node is still (and is always every time I tested
>>> it) the driver node.
>>>
>>> Tom
>>>
>>>
>>>
>>> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:
>>>
 Where do you see that ... I see 3 executors busy at first. If that's
 the crawl then ?

 On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:

> Yeah :)
>
> But it's all running through the same node. So I can run multiple
> tasks of the same type on the same node(the driver), but I can't run
> multiple tasks on multiple nodes.
>
> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>
>> Wait. Isn't that what you were trying to parallelize in the first
>> place?
>>
>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>
>>> Yeah but that something else is the crawl being run, which is
>>> triggered from inside the RDDs, because the log output is slowly 
>>> outputting
>>> crawl data.
>>>
>>>
> Spicule Limited is registered in England & Wales. Company Number:
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>
>
> All engagements are subject to Spicule Terms and Conditions of
> Business. This email and its contents are intended solely for 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
(I should point out that I'm diagnosing this by looking at the active tasks
https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly, let me
know)

On Wed, Jun 23, 2021 at 11:38 AM Tom Barber  wrote:

> Uff hello fine people.
>
> So the cause of the above issue was, unsurprisingly, human error. I found
> a local[*] spark master config which gazumped my own one so mystery
> solved. But I have another question, that is still the crux of this problem:
>
> Here's a bit of trimmed code, that I'm currently testing with. I
> deliberately stuck in a repartition(50), just to force it to, what I
> believe was chunk it up and distribute it. Which is all good.
>
> override def run(): Unit = {
> ...
>
> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
> val f = rdd.map(r => (r.getGroup, r))
>   .groupByKey().repartition(50);
>
> val c = f.getNumPartitions
>   val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
> FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer) })
>   .persist()
>
> val d = fetchedRdd.getNumPartitions
>
> ...
>
> val scoredRdd = score(fetchedRdd)
>
> ...
>
> }
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
> ScoreUpdateSolrTransformer(d))
>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>
> }
>
>
> Basically for anyone new to this, the business logic lives inside the
> FairFetcher and I need that distributed over all the nodes in spark cluster.
>
> Here's a quick illustration of what I'm seeing:
> https://pasteboard.co/K7VovBO.png
>
> It chunks up to code and distributes the tasks across the cluster, but
> that occurs _prior_ to the business logic  in the FlatMap being executed.
>
> So specifically, has anyone got any ideas about how to split that flatmap
> operation up so the RDD processing runs across the nodes, not limited to a
> single node?
>
> Thanks for all your help so far,
>
> Tom
>
> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:
>
>> Ah no sorry, so in the load image, the crawl has just kicked off on the
>> driver node which is why its flagged red and the load is spiking.
>> https://pasteboard.co/K5QHOJN.png here's the cluster now its been
>> running a while. The red node is still (and is always every time I tested
>> it) the driver node.
>>
>> Tom
>>
>>
>>
>> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:
>>
>>> Where do you see that ... I see 3 executors busy at first. If that's the
>>> crawl then ?
>>>
>>> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>>>
 Yeah :)

 But it's all running through the same node. So I can run multiple tasks
 of the same type on the same node(the driver), but I can't run multiple
 tasks on multiple nodes.

 On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:

> Wait. Isn't that what you were trying to parallelize in the first
> place?
>
> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>
>> Yeah but that something else is the crawl being run, which is
>> triggered from inside the RDDs, because the log output is slowly 
>> outputting
>> crawl data.
>>
>>
 Spicule Limited is registered in England & Wales. Company Number:
 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
 Road, Brighton, England, BN1 6AF. VAT No. 251478891.


 All engagements are subject to Spicule Terms and Conditions of
 Business. This email and its contents are intended solely for the
 individual to whom it is addressed and may contain information that is
 confidential, privileged or otherwise protected from disclosure,
 distributing or copying. Any views or opinions presented in this email are
 solely those of the author and do not necessarily represent those of
 Spicule Limited. The company accepts no liability for any damage caused by
 any virus transmitted by this email. If you have received this message in
 error, please notify us immediately by reply email before deleting it from
 your system. Service of legal notice cannot be effected on Spicule Limited
 by email.

>>>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
Uff hello fine people.

So the cause of the above issue was, unsurprisingly, human error. I found a
local[*] spark master config which gazumped my own one so mystery
solved. But I have another question, that is still the crux of this problem:

Here's a bit of trimmed code, that I'm currently testing with. I
deliberately stuck in a repartition(50), just to force it to, what I
believe was chunk it up and distribute it. Which is all good.

override def run(): Unit = {
...

val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
val f = rdd.map(r => (r.getGroup, r))
  .groupByKey().repartition(50);

val c = f.getNumPartitions
  val fetchedRdd = f.flatMap({ case (grp, rs) => new
FairFetcher(job, rs.iterator, localFetchDelay,
FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer) })
  .persist()

val d = fetchedRdd.getNumPartitions

...

val scoredRdd = score(fetchedRdd)

...

}

def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
  val job = this.job.asInstanceOf[SparklerJob]

  val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

  val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d =>
ScoreUpdateSolrTransformer(d))
  val scoreUpdateFunc = new SolrStatusUpdate(job)
  sc.runJob(scoreUpdateRdd, scoreUpdateFunc)

}


Basically for anyone new to this, the business logic lives inside the
FairFetcher and I need that distributed over all the nodes in spark cluster.

Here's a quick illustration of what I'm seeing:
https://pasteboard.co/K7VovBO.png

It chunks up to code and distributes the tasks across the cluster, but that
occurs _prior_ to the business logic  in the FlatMap being executed.

So specifically, has anyone got any ideas about how to split that flatmap
operation up so the RDD processing runs across the nodes, not limited to a
single node?

Thanks for all your help so far,

Tom

On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:

> Ah no sorry, so in the load image, the crawl has just kicked off on the
> driver node which is why its flagged red and the load is spiking.
> https://pasteboard.co/K5QHOJN.png here's the cluster now its been running
> a while. The red node is still (and is always every time I tested it) the
> driver node.
>
> Tom
>
>
>
> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:
>
>> Where do you see that ... I see 3 executors busy at first. If that's the
>> crawl then ?
>>
>> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>>
>>> Yeah :)
>>>
>>> But it's all running through the same node. So I can run multiple tasks
>>> of the same type on the same node(the driver), but I can't run multiple
>>> tasks on multiple nodes.
>>>
>>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>>>
 Wait. Isn't that what you were trying to parallelize in the first place?

 On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:

> Yeah but that something else is the crawl being run, which is
> triggered from inside the RDDs, because the log output is slowly 
> outputting
> crawl data.
>
>
>>> Spicule Limited is registered in England & Wales. Company Number:
>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>
>>>
>>> All engagements are subject to Spicule Terms and Conditions of Business.
>>> This email and its contents are intended solely for the individual to whom
>>> it is addressed and may contain information that is confidential,
>>> privileged or otherwise protected from disclosure, distributing or copying.
>>> Any views or opinions presented in this email are solely those of the
>>> author and do not necessarily represent those of Spicule Limited. The
>>> company accepts no liability for any damage caused by any virus transmitted
>>> by this email. If you have received this message in error, please notify us
>>> immediately by reply email before deleting it from your system. Service of
>>> legal notice cannot be effected on Spicule Limited by email.
>>>
>>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited