Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
Looks like repartitioning was my friend, seems to be distributed across the
cluster now.

All good. Thanks!


On Wed, Jun 23, 2021 at 2:18 PM Tom Barber  wrote:

> Okay so I tried another idea which was to use a real simple class to drive
> a mapPartitions... because logic in my head seems to suggest that I want to
> map my partitions...
>
> @SerialVersionUID(100L)
> class RunCrawl extends Serializable{
>   def mapCrawl(x: Iterator[(String, Iterable[Resource])], job:
> SparklerJob): Iterator[CrawlData] = {
> val m = 1000
> x.flatMap({case (grp, rs) => new FairFetcher(job, rs.iterator, m,
>   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer)})
>   }
>
>   def runCrawl(f: RDD[(String, Iterable[Resource])], job: SparklerJob):
> RDD[CrawlData] = {
> f.mapPartitions( x => mapCrawl(x, job))
>
>   }
>
> }
>
> That is what it looks like. But the task execution window in the cluster
> looks the same:
>
> https://pasteboard.co/K7WrBnV.png
>
> 1 task on a single node.
>
> I feel like I'm missing something obvious here about either
>
> a) how spark works
> b) how it divides up partitions to tasks
> c) the fact its a POJO and not a file of stuff.
>
> Or probably some of all 3.
>
> Tom
>
> On Wed, Jun 23, 2021 at 11:44 AM Tom Barber  wrote:
>
>> (I should point out that I'm diagnosing this by looking at the active
>> tasks https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly,
>> let me know)
>>
>> On Wed, Jun 23, 2021 at 11:38 AM Tom Barber  wrote:
>>
>>> Uff hello fine people.
>>>
>>> So the cause of the above issue was, unsurprisingly, human error. I
>>> found a local[*] spark master config which gazumped my own one so
>>> mystery solved. But I have another question, that is still the crux of this
>>> problem:
>>>
>>> Here's a bit of trimmed code, that I'm currently testing with. I
>>> deliberately stuck in a repartition(50), just to force it to, what I
>>> believe was chunk it up and distribute it. Which is all good.
>>>
>>> override def run(): Unit = {
>>> ...
>>>
>>> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
>>> val f = rdd.map(r => (r.getGroup, r))
>>>   .groupByKey().repartition(50);
>>>
>>> val c = f.getNumPartitions
>>>   val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, 
>>> rs.iterator, localFetchDelay,
>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer) })
>>>   .persist()
>>>
>>> val d = fetchedRdd.getNumPartitions
>>>
>>> ...
>>>
>>> val scoredRdd = score(fetchedRdd)
>>>
>>> ...
>>>
>>> }
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>>> ScoreUpdateSolrTransformer(d))
>>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>>
>>> }
>>>
>>>
>>> Basically for anyone new to this, the business logic lives inside the
>>> FairFetcher and I need that distributed over all the nodes in spark cluster.
>>>
>>> Here's a quick illustration of what I'm seeing:
>>> https://pasteboard.co/K7VovBO.png
>>>
>>> It chunks up to code and distributes the tasks across the cluster, but
>>> that occurs _prior_ to the business logic  in the FlatMap being executed.
>>>
>>> So specifically, has anyone got any ideas about how to split that
>>> flatmap operation up so the RDD processing runs across the nodes, not
>>> limited to a single node?
>>>
>>> Thanks for all your help so far,
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:
>>>
 Ah no sorry, so in the load image, the crawl has just kicked off on the
 driver node which is why its flagged red and the load is spiking.
 https://pasteboard.co/K5QHOJN.png here's the cluster now its been
 running a while. The red node is still (and is always every time I tested
 it) the driver node.

 Tom



 On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:

> Where do you see that ... I see 3 executors busy at first. If that's
> the crawl then ?
>
> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>
>> Yeah :)
>>
>> But it's all running through the same node. So I can run multiple
>> tasks of the same type on the same node(the driver), but I can't run
>> multiple tasks on multiple nodes.
>>
>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>>
>>> Wait. Isn't that what you were trying to parallelize in the first
>>> place?
>>>
>>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>>
 Yeah but that something else is the crawl being run, which is
 triggered from inside the RDDs, because the log output is slowly 
 outputting
 crawl data.


>> Spicule 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
Okay so I tried another idea which was to use a real simple class to drive
a mapPartitions... because logic in my head seems to suggest that I want to
map my partitions...

@SerialVersionUID(100L)
class RunCrawl extends Serializable{
  def mapCrawl(x: Iterator[(String, Iterable[Resource])], job:
SparklerJob): Iterator[CrawlData] = {
val m = 1000
x.flatMap({case (grp, rs) => new FairFetcher(job, rs.iterator, m,
  FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer)})
  }

  def runCrawl(f: RDD[(String, Iterable[Resource])], job: SparklerJob):
RDD[CrawlData] = {
f.mapPartitions( x => mapCrawl(x, job))

  }

}

That is what it looks like. But the task execution window in the cluster
looks the same:

https://pasteboard.co/K7WrBnV.png

1 task on a single node.

I feel like I'm missing something obvious here about either

a) how spark works
b) how it divides up partitions to tasks
c) the fact its a POJO and not a file of stuff.

Or probably some of all 3.

Tom

On Wed, Jun 23, 2021 at 11:44 AM Tom Barber  wrote:

> (I should point out that I'm diagnosing this by looking at the active
> tasks https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly,
> let me know)
>
> On Wed, Jun 23, 2021 at 11:38 AM Tom Barber  wrote:
>
>> Uff hello fine people.
>>
>> So the cause of the above issue was, unsurprisingly, human error. I found
>> a local[*] spark master config which gazumped my own one so mystery
>> solved. But I have another question, that is still the crux of this problem:
>>
>> Here's a bit of trimmed code, that I'm currently testing with. I
>> deliberately stuck in a repartition(50), just to force it to, what I
>> believe was chunk it up and distribute it. Which is all good.
>>
>> override def run(): Unit = {
>> ...
>>
>> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
>> val f = rdd.map(r => (r.getGroup, r))
>>   .groupByKey().repartition(50);
>>
>> val c = f.getNumPartitions
>>   val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer) })
>>   .persist()
>>
>> val d = fetchedRdd.getNumPartitions
>>
>> ...
>>
>> val scoredRdd = score(fetchedRdd)
>>
>> ...
>>
>> }
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>> ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>
>> }
>>
>>
>> Basically for anyone new to this, the business logic lives inside the
>> FairFetcher and I need that distributed over all the nodes in spark cluster.
>>
>> Here's a quick illustration of what I'm seeing:
>> https://pasteboard.co/K7VovBO.png
>>
>> It chunks up to code and distributes the tasks across the cluster, but
>> that occurs _prior_ to the business logic  in the FlatMap being executed.
>>
>> So specifically, has anyone got any ideas about how to split that flatmap
>> operation up so the RDD processing runs across the nodes, not limited to a
>> single node?
>>
>> Thanks for all your help so far,
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:
>>
>>> Ah no sorry, so in the load image, the crawl has just kicked off on the
>>> driver node which is why its flagged red and the load is spiking.
>>> https://pasteboard.co/K5QHOJN.png here's the cluster now its been
>>> running a while. The red node is still (and is always every time I tested
>>> it) the driver node.
>>>
>>> Tom
>>>
>>>
>>>
>>> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:
>>>
 Where do you see that ... I see 3 executors busy at first. If that's
 the crawl then ?

 On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:

> Yeah :)
>
> But it's all running through the same node. So I can run multiple
> tasks of the same type on the same node(the driver), but I can't run
> multiple tasks on multiple nodes.
>
> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>
>> Wait. Isn't that what you were trying to parallelize in the first
>> place?
>>
>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>
>>> Yeah but that something else is the crawl being run, which is
>>> triggered from inside the RDDs, because the log output is slowly 
>>> outputting
>>> crawl data.
>>>
>>>
> Spicule Limited is registered in England & Wales. Company Number:
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>
>
> All engagements are subject to Spicule Terms and Conditions of
> Business. This email and its contents are intended solely for 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
(I should point out that I'm diagnosing this by looking at the active tasks
https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly, let me
know)

On Wed, Jun 23, 2021 at 11:38 AM Tom Barber  wrote:

> Uff hello fine people.
>
> So the cause of the above issue was, unsurprisingly, human error. I found
> a local[*] spark master config which gazumped my own one so mystery
> solved. But I have another question, that is still the crux of this problem:
>
> Here's a bit of trimmed code, that I'm currently testing with. I
> deliberately stuck in a repartition(50), just to force it to, what I
> believe was chunk it up and distribute it. Which is all good.
>
> override def run(): Unit = {
> ...
>
> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
> val f = rdd.map(r => (r.getGroup, r))
>   .groupByKey().repartition(50);
>
> val c = f.getNumPartitions
>   val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
> FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer) })
>   .persist()
>
> val d = fetchedRdd.getNumPartitions
>
> ...
>
> val scoredRdd = score(fetchedRdd)
>
> ...
>
> }
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
> ScoreUpdateSolrTransformer(d))
>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>
> }
>
>
> Basically for anyone new to this, the business logic lives inside the
> FairFetcher and I need that distributed over all the nodes in spark cluster.
>
> Here's a quick illustration of what I'm seeing:
> https://pasteboard.co/K7VovBO.png
>
> It chunks up to code and distributes the tasks across the cluster, but
> that occurs _prior_ to the business logic  in the FlatMap being executed.
>
> So specifically, has anyone got any ideas about how to split that flatmap
> operation up so the RDD processing runs across the nodes, not limited to a
> single node?
>
> Thanks for all your help so far,
>
> Tom
>
> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:
>
>> Ah no sorry, so in the load image, the crawl has just kicked off on the
>> driver node which is why its flagged red and the load is spiking.
>> https://pasteboard.co/K5QHOJN.png here's the cluster now its been
>> running a while. The red node is still (and is always every time I tested
>> it) the driver node.
>>
>> Tom
>>
>>
>>
>> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:
>>
>>> Where do you see that ... I see 3 executors busy at first. If that's the
>>> crawl then ?
>>>
>>> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>>>
 Yeah :)

 But it's all running through the same node. So I can run multiple tasks
 of the same type on the same node(the driver), but I can't run multiple
 tasks on multiple nodes.

 On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:

> Wait. Isn't that what you were trying to parallelize in the first
> place?
>
> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>
>> Yeah but that something else is the crawl being run, which is
>> triggered from inside the RDDs, because the log output is slowly 
>> outputting
>> crawl data.
>>
>>
 Spicule Limited is registered in England & Wales. Company Number:
 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
 Road, Brighton, England, BN1 6AF. VAT No. 251478891.


 All engagements are subject to Spicule Terms and Conditions of
 Business. This email and its contents are intended solely for the
 individual to whom it is addressed and may contain information that is
 confidential, privileged or otherwise protected from disclosure,
 distributing or copying. Any views or opinions presented in this email are
 solely those of the author and do not necessarily represent those of
 Spicule Limited. The company accepts no liability for any damage caused by
 any virus transmitted by this email. If you have received this message in
 error, please notify us immediately by reply email before deleting it from
 your system. Service of legal notice cannot be effected on Spicule Limited
 by email.

>>>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
Uff hello fine people.

So the cause of the above issue was, unsurprisingly, human error. I found a
local[*] spark master config which gazumped my own one so mystery
solved. But I have another question, that is still the crux of this problem:

Here's a bit of trimmed code, that I'm currently testing with. I
deliberately stuck in a repartition(50), just to force it to, what I
believe was chunk it up and distribute it. Which is all good.

override def run(): Unit = {
...

val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
val f = rdd.map(r => (r.getGroup, r))
  .groupByKey().repartition(50);

val c = f.getNumPartitions
  val fetchedRdd = f.flatMap({ case (grp, rs) => new
FairFetcher(job, rs.iterator, localFetchDelay,
FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer) })
  .persist()

val d = fetchedRdd.getNumPartitions

...

val scoredRdd = score(fetchedRdd)

...

}

def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
  val job = this.job.asInstanceOf[SparklerJob]

  val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

  val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d =>
ScoreUpdateSolrTransformer(d))
  val scoreUpdateFunc = new SolrStatusUpdate(job)
  sc.runJob(scoreUpdateRdd, scoreUpdateFunc)

}


Basically for anyone new to this, the business logic lives inside the
FairFetcher and I need that distributed over all the nodes in spark cluster.

Here's a quick illustration of what I'm seeing:
https://pasteboard.co/K7VovBO.png

It chunks up to code and distributes the tasks across the cluster, but that
occurs _prior_ to the business logic  in the FlatMap being executed.

So specifically, has anyone got any ideas about how to split that flatmap
operation up so the RDD processing runs across the nodes, not limited to a
single node?

Thanks for all your help so far,

Tom

On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:

> Ah no sorry, so in the load image, the crawl has just kicked off on the
> driver node which is why its flagged red and the load is spiking.
> https://pasteboard.co/K5QHOJN.png here's the cluster now its been running
> a while. The red node is still (and is always every time I tested it) the
> driver node.
>
> Tom
>
>
>
> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:
>
>> Where do you see that ... I see 3 executors busy at first. If that's the
>> crawl then ?
>>
>> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>>
>>> Yeah :)
>>>
>>> But it's all running through the same node. So I can run multiple tasks
>>> of the same type on the same node(the driver), but I can't run multiple
>>> tasks on multiple nodes.
>>>
>>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>>>
 Wait. Isn't that what you were trying to parallelize in the first place?

 On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:

> Yeah but that something else is the crawl being run, which is
> triggered from inside the RDDs, because the log output is slowly 
> outputting
> crawl data.
>
>
>>> Spicule Limited is registered in England & Wales. Company Number:
>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>
>>>
>>> All engagements are subject to Spicule Terms and Conditions of Business.
>>> This email and its contents are intended solely for the individual to whom
>>> it is addressed and may contain information that is confidential,
>>> privileged or otherwise protected from disclosure, distributing or copying.
>>> Any views or opinions presented in this email are solely those of the
>>> author and do not necessarily represent those of Spicule Limited. The
>>> company accepts no liability for any damage caused by any virus transmitted
>>> by this email. If you have received this message in error, please notify us
>>> immediately by reply email before deleting it from your system. Service of
>>> legal notice cannot be effected on Spicule Limited by email.
>>>
>>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Ah no sorry, so in the load image, the crawl has just kicked off on the
driver node which is why its flagged red and the load is spiking.
https://pasteboard.co/K5QHOJN.png here's the cluster now its been running a
while. The red node is still (and is always every time I tested it) the
driver node.

Tom



On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:

> Where do you see that ... I see 3 executors busy at first. If that's the
> crawl then ?
>
> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>
>> Yeah :)
>>
>> But it's all running through the same node. So I can run multiple tasks
>> of the same type on the same node(the driver), but I can't run multiple
>> tasks on multiple nodes.
>>
>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>>
>>> Wait. Isn't that what you were trying to parallelize in the first place?
>>>
>>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>>
 Yeah but that something else is the crawl being run, which is triggered
 from inside the RDDs, because the log output is slowly outputting crawl
 data.


>> Spicule Limited is registered in England & Wales. Company Number:
>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>
>>
>> All engagements are subject to Spicule Terms and Conditions of Business.
>> This email and its contents are intended solely for the individual to whom
>> it is addressed and may contain information that is confidential,
>> privileged or otherwise protected from disclosure, distributing or copying.
>> Any views or opinions presented in this email are solely those of the
>> author and do not necessarily represent those of Spicule Limited. The
>> company accepts no liability for any damage caused by any virus transmitted
>> by this email. If you have received this message in error, please notify us
>> immediately by reply email before deleting it from your system. Service of
>> legal notice cannot be effected on Spicule Limited by email.
>>
>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sean Owen
Where do you see that ... I see 3 executors busy at first. If that's the
crawl then ?

On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:

> Yeah :)
>
> But it's all running through the same node. So I can run multiple tasks of
> the same type on the same node(the driver), but I can't run multiple tasks
> on multiple nodes.
>
> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>
>> Wait. Isn't that what you were trying to parallelize in the first place?
>>
>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>
>>> Yeah but that something else is the crawl being run, which is triggered
>>> from inside the RDDs, because the log output is slowly outputting crawl
>>> data.
>>>
>>>
> Spicule Limited is registered in England & Wales. Company Number:
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>
>
> All engagements are subject to Spicule Terms and Conditions of Business.
> This email and its contents are intended solely for the individual to whom
> it is addressed and may contain information that is confidential,
> privileged or otherwise protected from disclosure, distributing or copying.
> Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of Spicule Limited. The
> company accepts no liability for any damage caused by any virus transmitted
> by this email. If you have received this message in error, please notify us
> immediately by reply email before deleting it from your system. Service of
> legal notice cannot be effected on Spicule Limited by email.
>


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Yeah :)

But it's all running through the same node. So I can run multiple tasks of
the same type on the same node(the driver), but I can't run multiple tasks
on multiple nodes.

On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:

> Wait. Isn't that what you were trying to parallelize in the first place?
>
> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>
>> Yeah but that something else is the crawl being run, which is triggered
>> from inside the RDDs, because the log output is slowly outputting crawl
>> data.
>>
>>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Yeah but that something else is the crawl being run, which is triggered
from inside the RDDs, because the log output is slowly outputting crawl
data.

On Wed, 9 Jun 2021, 19:47 Sean Owen,  wrote:

> That looks like you did some work on the cluster, and now it's stuck doing
> something else on the driver - not doing everything on 1 machine.
>
> On Wed, Jun 9, 2021 at 12:43 PM Tom Barber  wrote:
>
>> And also as this morning: https://pasteboard.co/K5Q9aEf.png
>>
>> Removing the cpu pins gives me more tasks but as you can see here:
>>
>> https://pasteboard.co/K5Q9GO0.png
>>
>> It just loads up a single server.
>>
>>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sean Owen
That looks like you did some work on the cluster, and now it's stuck doing
something else on the driver - not doing everything on 1 machine.

On Wed, Jun 9, 2021 at 12:43 PM Tom Barber  wrote:

> And also as this morning: https://pasteboard.co/K5Q9aEf.png
>
> Removing the cpu pins gives me more tasks but as you can see here:
>
> https://pasteboard.co/K5Q9GO0.png
>
> It just loads up a single server.
>
>


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
No, this is an on demand databricks cluster.

On Wed, Jun 9, 2021 at 6:54 PM Mich Talebzadeh 
wrote:

>
>
> Are you running this in Managed Instance Group (MIG)?
>
> https://cloud.google.com/compute/docs/instance-groups
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 9 Jun 2021 at 18:43, Tom Barber  wrote:
>
>> And also as this morning: https://pasteboard.co/K5Q9aEf.png
>>
>> Removing the cpu pins gives me more tasks but as you can see here:
>>
>> https://pasteboard.co/K5Q9GO0.png
>>
>> It just loads up a single server.
>>
>> On Wed, Jun 9, 2021 at 6:32 PM Tom Barber  wrote:
>>
>>> Thanks Chris
>>>
>>> All the code I have on both sides is as modern as it allows. Running
>>> Spark 3.1.1 and Scala 2.12.
>>>
>>> I stuck some logging in to check reality:
>>>
>>> LOG.info("GROUP COUNT: " + fetchedgrp.count())
>>> val cgrp = fetchedgrp.collect()
>>> cgrp.foreach(f => {
>>>   LOG.info("Out1 :" + f._1)
>>>   f._2.foreach(u => {
>>> LOG.info("ID:" + u.getId)
>>> LOG.info("GROUP:" + u.getGroup)
>>>   })
>>> })
>>> LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions)
>>> val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new 
>>> FairFetcher(job, rs.iterator, localFetchDelay,
>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer) })
>>>   .persist()
>>>
>>> LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions)
>>> LOG.info("CoUNT: " + fetchedRdd.count())
>>>
>>>
>>> It says I have 5000 groups, which makes sense as its defined in my
>>> command line and both sides claim to have 50 partitions which also makes
>>> sense as I define that in my code as well.
>>>
>>> Then it starts the crawl at the final count line as I guess it needs to
>>> materialize things and so at that point I don't know what the count would
>>> return, but everything else checks out.
>>>
>>> I'll poke around in the other hints you suggested later, thanks for the
>>> help.
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 5:49 PM Chris Martin 
>>> wrote:
>>>
 Hmm then my guesses are (in order of decreasing probability:

 * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
 compatible with the lastest spark release.
 * You've got 16 threads per task on a 16 core machine.  Should be fine,
 but I wonder if it's confusing things as you don't also set
 spark.executor.cores and Databricks might also default that to 1.
 * There's some custom partitioner in play which is causing everything
 to go to the same partition.
 * The group keys are all hashing to the same value (it's difficult to
 see how this would be the case if the group keys are genuinely different,
 but maybe there's something else going on).

 My hints:

 1. Make sure you're using a recent version of sparkler
 2. Try repartition with a custom partitioner that you know will end
 things to different partitions
 3. Try either removing "spark.task.cpus":"16"  or setting
 spark.executor.cores to 1.
 4. print out the group keys and see if there's any weird pattern to
 them.
 5. See if the same thing happens in spark local.

 If you have a reproducible example you can post publically then I'm
 happy to  take a look.

 Chris

 On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:

> Yeah to test that I just set the group key to the ID in the record
> which is a solr supplied UUID, which means effectively you end up with 
> 4000
> groups now.
>
> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin 
> wrote:
>
>> One thing I would check is this line:
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>
>> how many distinct groups do you ended up with?  If there's just one
>> then I think you might see the behaviour you observe.
>>
>> Chris
>>
>>
>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>>
>>> Also just to follow up on that slightly, I did also try off the back
>>> of another comment:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>>
>>>
>>> Where I repartitioned that scoredRdd map out of interest, it then
>>> triggers the FairFetcher function there, instead of in the runJob(), but
>>> still on a 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Mich Talebzadeh
Are you running this in Managed Instance Group (MIG)?

https://cloud.google.com/compute/docs/instance-groups


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 9 Jun 2021 at 18:43, Tom Barber  wrote:

> And also as this morning: https://pasteboard.co/K5Q9aEf.png
>
> Removing the cpu pins gives me more tasks but as you can see here:
>
> https://pasteboard.co/K5Q9GO0.png
>
> It just loads up a single server.
>
> On Wed, Jun 9, 2021 at 6:32 PM Tom Barber  wrote:
>
>> Thanks Chris
>>
>> All the code I have on both sides is as modern as it allows. Running
>> Spark 3.1.1 and Scala 2.12.
>>
>> I stuck some logging in to check reality:
>>
>> LOG.info("GROUP COUNT: " + fetchedgrp.count())
>> val cgrp = fetchedgrp.collect()
>> cgrp.foreach(f => {
>>   LOG.info("Out1 :" + f._1)
>>   f._2.foreach(u => {
>> LOG.info("ID:" + u.getId)
>> LOG.info("GROUP:" + u.getGroup)
>>   })
>> })
>> LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions)
>> val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer) })
>>   .persist()
>>
>> LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions)
>> LOG.info("CoUNT: " + fetchedRdd.count())
>>
>>
>> It says I have 5000 groups, which makes sense as its defined in my
>> command line and both sides claim to have 50 partitions which also makes
>> sense as I define that in my code as well.
>>
>> Then it starts the crawl at the final count line as I guess it needs to
>> materialize things and so at that point I don't know what the count would
>> return, but everything else checks out.
>>
>> I'll poke around in the other hints you suggested later, thanks for the
>> help.
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 5:49 PM Chris Martin 
>> wrote:
>>
>>> Hmm then my guesses are (in order of decreasing probability:
>>>
>>> * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
>>> compatible with the lastest spark release.
>>> * You've got 16 threads per task on a 16 core machine.  Should be fine,
>>> but I wonder if it's confusing things as you don't also set
>>> spark.executor.cores and Databricks might also default that to 1.
>>> * There's some custom partitioner in play which is causing everything to
>>> go to the same partition.
>>> * The group keys are all hashing to the same value (it's difficult to
>>> see how this would be the case if the group keys are genuinely different,
>>> but maybe there's something else going on).
>>>
>>> My hints:
>>>
>>> 1. Make sure you're using a recent version of sparkler
>>> 2. Try repartition with a custom partitioner that you know will end
>>> things to different partitions
>>> 3. Try either removing "spark.task.cpus":"16"  or setting
>>> spark.executor.cores to 1.
>>> 4. print out the group keys and see if there's any weird pattern to them.
>>> 5. See if the same thing happens in spark local.
>>>
>>> If you have a reproducible example you can post publically then I'm
>>> happy to  take a look.
>>>
>>> Chris
>>>
>>> On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:
>>>
 Yeah to test that I just set the group key to the ID in the record
 which is a solr supplied UUID, which means effectively you end up with 4000
 groups now.

 On Wed, Jun 9, 2021 at 5:13 PM Chris Martin 
 wrote:

> One thing I would check is this line:
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>
> how many distinct groups do you ended up with?  If there's just one
> then I think you might see the behaviour you observe.
>
> Chris
>
>
> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>
>> Also just to follow up on that slightly, I did also try off the back
>> of another comment:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>
>>
>> Where I repartitioned that scoredRdd map out of interest, it then
>> triggers the FairFetcher function there, instead of in the runJob(), but
>> still on a single executor 
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>
>>>
>>> Okay so what happens is that the crawler reads a bunch of solr data,
>>> we're not talking GB's just a list of JSON and turns that into a 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
And also as this morning: https://pasteboard.co/K5Q9aEf.png

Removing the cpu pins gives me more tasks but as you can see here:

https://pasteboard.co/K5Q9GO0.png

It just loads up a single server.

On Wed, Jun 9, 2021 at 6:32 PM Tom Barber  wrote:

> Thanks Chris
>
> All the code I have on both sides is as modern as it allows. Running Spark
> 3.1.1 and Scala 2.12.
>
> I stuck some logging in to check reality:
>
> LOG.info("GROUP COUNT: " + fetchedgrp.count())
> val cgrp = fetchedgrp.collect()
> cgrp.foreach(f => {
>   LOG.info("Out1 :" + f._1)
>   f._2.foreach(u => {
> LOG.info("ID:" + u.getId)
> LOG.info("GROUP:" + u.getGroup)
>   })
> })
> LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions)
> val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
> FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer) })
>   .persist()
>
> LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions)
> LOG.info("CoUNT: " + fetchedRdd.count())
>
>
> It says I have 5000 groups, which makes sense as its defined in my command
> line and both sides claim to have 50 partitions which also makes sense as I
> define that in my code as well.
>
> Then it starts the crawl at the final count line as I guess it needs to
> materialize things and so at that point I don't know what the count would
> return, but everything else checks out.
>
> I'll poke around in the other hints you suggested later, thanks for the
> help.
>
> Tom
>
> On Wed, Jun 9, 2021 at 5:49 PM Chris Martin  wrote:
>
>> Hmm then my guesses are (in order of decreasing probability:
>>
>> * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
>> compatible with the lastest spark release.
>> * You've got 16 threads per task on a 16 core machine.  Should be fine,
>> but I wonder if it's confusing things as you don't also set
>> spark.executor.cores and Databricks might also default that to 1.
>> * There's some custom partitioner in play which is causing everything to
>> go to the same partition.
>> * The group keys are all hashing to the same value (it's difficult to see
>> how this would be the case if the group keys are genuinely different, but
>> maybe there's something else going on).
>>
>> My hints:
>>
>> 1. Make sure you're using a recent version of sparkler
>> 2. Try repartition with a custom partitioner that you know will end
>> things to different partitions
>> 3. Try either removing "spark.task.cpus":"16"  or setting
>> spark.executor.cores to 1.
>> 4. print out the group keys and see if there's any weird pattern to them.
>> 5. See if the same thing happens in spark local.
>>
>> If you have a reproducible example you can post publically then I'm happy
>> to  take a look.
>>
>> Chris
>>
>> On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:
>>
>>> Yeah to test that I just set the group key to the ID in the record which
>>> is a solr supplied UUID, which means effectively you end up with 4000
>>> groups now.
>>>
>>> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin 
>>> wrote:
>>>
 One thing I would check is this line:

 val fetchedRdd = rdd.map(r => (r.getGroup, r))

 how many distinct groups do you ended up with?  If there's just one
 then I think you might see the behaviour you observe.

 Chris


 On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:

> Also just to follow up on that slightly, I did also try off the back
> of another comment:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>
>
> Where I repartitioned that scoredRdd map out of interest, it then
> triggers the FairFetcher function there, instead of in the runJob(), but
> still on a single executor 
>
> Tom
>
> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>
>>
>> Okay so what happens is that the crawler reads a bunch of solr data,
>> we're not talking GB's just a list of JSON and turns that into a bunch of
>> RDD's that end up in that flatmap that I linked to first.
>>
>> The fair fetcher is an interface to a pluggable backend that
>> basically takes some of the fields and goes and crawls websites listed in
>> them looking for information. We wrote this code 6 years ago for a DARPA
>> project tracking down criminals on the web. Now I'm reusing it but trying
>> to force it to scale out a bit more.
>>
>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I
>> want to push down 1 URL (a few more wont hurt, but crawling 50 urls in
>> parallel on one node makes my cluster sad) to each executor and have it 
>> run
>> a crawl, then move on and 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Thanks Chris

All the code I have on both sides is as modern as it allows. Running Spark
3.1.1 and Scala 2.12.

I stuck some logging in to check reality:

LOG.info("GROUP COUNT: " + fetchedgrp.count())
val cgrp = fetchedgrp.collect()
cgrp.foreach(f => {
  LOG.info("Out1 :" + f._1)
  f._2.foreach(u => {
LOG.info("ID:" + u.getId)
LOG.info("GROUP:" + u.getGroup)
  })
})
LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions)
val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new
FairFetcher(job, rs.iterator, localFetchDelay,
FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer) })
  .persist()

LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions)
LOG.info("CoUNT: " + fetchedRdd.count())


It says I have 5000 groups, which makes sense as its defined in my command
line and both sides claim to have 50 partitions which also makes sense as I
define that in my code as well.

Then it starts the crawl at the final count line as I guess it needs to
materialize things and so at that point I don't know what the count would
return, but everything else checks out.

I'll poke around in the other hints you suggested later, thanks for the
help.

Tom

On Wed, Jun 9, 2021 at 5:49 PM Chris Martin  wrote:

> Hmm then my guesses are (in order of decreasing probability:
>
> * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
> compatible with the lastest spark release.
> * You've got 16 threads per task on a 16 core machine.  Should be fine,
> but I wonder if it's confusing things as you don't also set
> spark.executor.cores and Databricks might also default that to 1.
> * There's some custom partitioner in play which is causing everything to
> go to the same partition.
> * The group keys are all hashing to the same value (it's difficult to see
> how this would be the case if the group keys are genuinely different, but
> maybe there's something else going on).
>
> My hints:
>
> 1. Make sure you're using a recent version of sparkler
> 2. Try repartition with a custom partitioner that you know will end things
> to different partitions
> 3. Try either removing "spark.task.cpus":"16"  or setting
> spark.executor.cores to 1.
> 4. print out the group keys and see if there's any weird pattern to them.
> 5. See if the same thing happens in spark local.
>
> If you have a reproducible example you can post publically then I'm happy
> to  take a look.
>
> Chris
>
> On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:
>
>> Yeah to test that I just set the group key to the ID in the record which
>> is a solr supplied UUID, which means effectively you end up with 4000
>> groups now.
>>
>> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin 
>> wrote:
>>
>>> One thing I would check is this line:
>>>
>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>
>>> how many distinct groups do you ended up with?  If there's just one then
>>> I think you might see the behaviour you observe.
>>>
>>> Chris
>>>
>>>
>>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>>>
 Also just to follow up on that slightly, I did also try off the back of
 another comment:

 def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
   val job = this.job.asInstanceOf[SparklerJob]

   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

   val scoreUpdateRdd: RDD[SolrInputDocument] = 
 scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))


 Where I repartitioned that scoredRdd map out of interest, it then
 triggers the FairFetcher function there, instead of in the runJob(), but
 still on a single executor 

 Tom

 On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:

>
> Okay so what happens is that the crawler reads a bunch of solr data,
> we're not talking GB's just a list of JSON and turns that into a bunch of
> RDD's that end up in that flatmap that I linked to first.
>
> The fair fetcher is an interface to a pluggable backend that basically
> takes some of the fields and goes and crawls websites listed in them
> looking for information. We wrote this code 6 years ago for a DARPA 
> project
> tracking down criminals on the web. Now I'm reusing it but trying to force
> it to scale out a bit more.
>
> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I
> want to push down 1 URL (a few more wont hurt, but crawling 50 urls in
> parallel on one node makes my cluster sad) to each executor and have it 
> run
> a crawl, then move on and get another one and so on. That way you're not
> saturating a node trying to look up all of them and you could add more
> nodes for greater capacity pretty quickly. Once the website has been
> captured, you can then "score" it for want of a better term to determine
> its usefulness, which is where the map is being triggered.
>
> In answer to your questions Sean, no action seems 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Chris Martin
Hmm then my guesses are (in order of decreasing probability:

* Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
compatible with the lastest spark release.
* You've got 16 threads per task on a 16 core machine.  Should be fine, but
I wonder if it's confusing things as you don't also set
spark.executor.cores and Databricks might also default that to 1.
* There's some custom partitioner in play which is causing everything to go
to the same partition.
* The group keys are all hashing to the same value (it's difficult to see
how this would be the case if the group keys are genuinely different, but
maybe there's something else going on).

My hints:

1. Make sure you're using a recent version of sparkler
2. Try repartition with a custom partitioner that you know will end things
to different partitions
3. Try either removing "spark.task.cpus":"16"  or setting
spark.executor.cores to 1.
4. print out the group keys and see if there's any weird pattern to them.
5. See if the same thing happens in spark local.

If you have a reproducible example you can post publically then I'm happy
to  take a look.

Chris

On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:

> Yeah to test that I just set the group key to the ID in the record which
> is a solr supplied UUID, which means effectively you end up with 4000
> groups now.
>
> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin  wrote:
>
>> One thing I would check is this line:
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>
>> how many distinct groups do you ended up with?  If there's just one then
>> I think you might see the behaviour you observe.
>>
>> Chris
>>
>>
>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>>
>>> Also just to follow up on that slightly, I did also try off the back of
>>> another comment:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>>
>>>
>>> Where I repartitioned that scoredRdd map out of interest, it then
>>> triggers the FairFetcher function there, instead of in the runJob(), but
>>> still on a single executor 
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>>

 Okay so what happens is that the crawler reads a bunch of solr data,
 we're not talking GB's just a list of JSON and turns that into a bunch of
 RDD's that end up in that flatmap that I linked to first.

 The fair fetcher is an interface to a pluggable backend that basically
 takes some of the fields and goes and crawls websites listed in them
 looking for information. We wrote this code 6 years ago for a DARPA project
 tracking down criminals on the web. Now I'm reusing it but trying to force
 it to scale out a bit more.

 Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
 to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
 on one node makes my cluster sad) to each executor and have it run a crawl,
 then move on and get another one and so on. That way you're not saturating
 a node trying to look up all of them and you could add more nodes for
 greater capacity pretty quickly. Once the website has been captured, you
 can then "score" it for want of a better term to determine its usefulness,
 which is where the map is being triggered.

 In answer to your questions Sean, no action seems triggered until you
 end up in the score block and the sc.runJob() because thats literally the
 next line of functionality as Kafka isn't enabled.

 val fetchedRdd = rdd.map(r => (r.getGroup, r))
   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
 rs.iterator, localFetchDelay,
 FetchFunction, ParseFunction, OutLinkFilterFunction, 
 StatusUpdateSolrTransformer).toSeq })
   .persist()

 if (kafkaEnable) {
   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
 }
 val scoredRdd = score(fetchedRdd)


 That if block is disabled so the score function runs. Inside of that:

 def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
   val job = this.job.asInstanceOf[SparklerJob]

   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
 ScoreUpdateSolrTransformer(d))
   val scoreUpdateFunc = new SolrStatusUpdate(job)
   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
 


 When its doing stuff in the SparkUI I can see that its waiting on the
 sc.runJob() line, so thats the execution point.


 Tom

 On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:

> persist() doesn't even persist by itself - just sets it to be

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Yeah to test that I just set the group key to the ID in the record which is
a solr supplied UUID, which means effectively you end up with 4000 groups
now.

On Wed, Jun 9, 2021 at 5:13 PM Chris Martin  wrote:

> One thing I would check is this line:
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>
> how many distinct groups do you ended up with?  If there's just one then I
> think you might see the behaviour you observe.
>
> Chris
>
>
> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>
>> Also just to follow up on that slightly, I did also try off the back of
>> another comment:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>
>>
>> Where I repartitioned that scoredRdd map out of interest, it then
>> triggers the FairFetcher function there, instead of in the runJob(), but
>> still on a single executor 
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>
>>>
>>> Okay so what happens is that the crawler reads a bunch of solr data,
>>> we're not talking GB's just a list of JSON and turns that into a bunch of
>>> RDD's that end up in that flatmap that I linked to first.
>>>
>>> The fair fetcher is an interface to a pluggable backend that basically
>>> takes some of the fields and goes and crawls websites listed in them
>>> looking for information. We wrote this code 6 years ago for a DARPA project
>>> tracking down criminals on the web. Now I'm reusing it but trying to force
>>> it to scale out a bit more.
>>>
>>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>>> on one node makes my cluster sad) to each executor and have it run a crawl,
>>> then move on and get another one and so on. That way you're not saturating
>>> a node trying to look up all of them and you could add more nodes for
>>> greater capacity pretty quickly. Once the website has been captured, you
>>> can then "score" it for want of a better term to determine its usefulness,
>>> which is where the map is being triggered.
>>>
>>> In answer to your questions Sean, no action seems triggered until you
>>> end up in the score block and the sc.runJob() because thats literally the
>>> next line of functionality as Kafka isn't enabled.
>>>
>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>>> rs.iterator, localFetchDelay,
>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer).toSeq })
>>>   .persist()
>>>
>>> if (kafkaEnable) {
>>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>>> }
>>> val scoredRdd = score(fetchedRdd)
>>>
>>>
>>> That if block is disabled so the score function runs. Inside of that:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>>> ScoreUpdateSolrTransformer(d))
>>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>> 
>>>
>>>
>>> When its doing stuff in the SparkUI I can see that its waiting on the
>>> sc.runJob() line, so thats the execution point.
>>>
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>>>
 persist() doesn't even persist by itself - just sets it to be persisted
 when it's executed.
 key doesn't matter here, nor partitioning, if this code is trying to
 run things on the driver inadvertently.
 I don't quite grok what the OSS code you linked to is doing, but it's
 running some supplied functions very directly and at a low-level with
 sc.runJob, which might be part of how this can do something unusual.
 How do you trigger any action? what happens after persist()

 On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:

> Thanks Mich,
>
> The key on the first iteration is just a string that says "seed", so
> it is indeed on the first crawl the same across all of the groups. Further
> iterations would be different, but I'm not there yet. I was under the
> impression that a repartition would distribute the tasks. Is that not the
> case?
>
> Thanks
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Tom,
>>
>> Persist() here simply means persist to memory). That is all. You can
>> check UI tab on storage
>>
>>
>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>>
>> So I gather the code is stuck from your link 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Chris Martin
One thing I would check is this line:

val fetchedRdd = rdd.map(r => (r.getGroup, r))

how many distinct groups do you ended up with?  If there's just one then I
think you might see the behaviour you observe.

Chris


On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:

> Also just to follow up on that slightly, I did also try off the back of
> another comment:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>
>
> Where I repartitioned that scoredRdd map out of interest, it then triggers
> the FairFetcher function there, instead of in the runJob(), but still on a
> single executor 
>
> Tom
>
> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>
>>
>> Okay so what happens is that the crawler reads a bunch of solr data,
>> we're not talking GB's just a list of JSON and turns that into a bunch of
>> RDD's that end up in that flatmap that I linked to first.
>>
>> The fair fetcher is an interface to a pluggable backend that basically
>> takes some of the fields and goes and crawls websites listed in them
>> looking for information. We wrote this code 6 years ago for a DARPA project
>> tracking down criminals on the web. Now I'm reusing it but trying to force
>> it to scale out a bit more.
>>
>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>> on one node makes my cluster sad) to each executor and have it run a crawl,
>> then move on and get another one and so on. That way you're not saturating
>> a node trying to look up all of them and you could add more nodes for
>> greater capacity pretty quickly. Once the website has been captured, you
>> can then "score" it for want of a better term to determine its usefulness,
>> which is where the map is being triggered.
>>
>> In answer to your questions Sean, no action seems triggered until you end
>> up in the score block and the sc.runJob() because thats literally the next
>> line of functionality as Kafka isn't enabled.
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer).toSeq })
>>   .persist()
>>
>> if (kafkaEnable) {
>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>> }
>> val scoredRdd = score(fetchedRdd)
>>
>>
>> That if block is disabled so the score function runs. Inside of that:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>> ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>> 
>>
>>
>> When its doing stuff in the SparkUI I can see that its waiting on the
>> sc.runJob() line, so thats the execution point.
>>
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>>
>>> persist() doesn't even persist by itself - just sets it to be persisted
>>> when it's executed.
>>> key doesn't matter here, nor partitioning, if this code is trying to run
>>> things on the driver inadvertently.
>>> I don't quite grok what the OSS code you linked to is doing, but it's
>>> running some supplied functions very directly and at a low-level with
>>> sc.runJob, which might be part of how this can do something unusual.
>>> How do you trigger any action? what happens after persist()
>>>
>>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:
>>>
 Thanks Mich,

 The key on the first iteration is just a string that says "seed", so it
 is indeed on the first crawl the same across all of the groups. Further
 iterations would be different, but I'm not there yet. I was under the
 impression that a repartition would distribute the tasks. Is that not the
 case?

 Thanks

 Tom

 On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hi Tom,
>
> Persist() here simply means persist to memory). That is all. You can
> check UI tab on storage
>
>
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>
> So I gather the code is stuck from your link in the driver. You stated
> that you tried repartition() but it did not do anything,
>
> Further you stated :
>
> " The key is pretty static in these tests, so I have also tried
> forcing the partition count (50 on a 16 core per node cluster) and also
> repartitioning, but every time all the jobs are scheduled 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
@sam:

def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
  val job = this.job.asInstanceOf[SparklerJob]

  val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
  val m = 50
  val repRdd = scoredRdd.repartition(m).cache()
  repRdd.take(1)

  val scoreUpdateRdd: RDD[SolrInputDocument] = repRdd.map(d =>
ScoreUpdateSolrTransformer(d))

I did that, but the crawl is executed in that repartition executor (which I
should have pointed out I already know).

Tom

On Wed, Jun 9, 2021 at 4:37 PM Tom Barber  wrote:

> Sorry Sam, I missed that earlier, I'll give it a spin.
>
>
> To everyone involved, this code is old, and not written by me. If you all
> go "oooh, you want to distribute the crawls over the cluster, you don't
> want to do it like that, you should look at XYZ instead" feel free to punt
> different ways of doing this across, I'm happy to refactor the code to
> modernize it/follow better practices.
>
> On Wed, Jun 9, 2021 at 4:25 PM Sam  wrote:
>
>> Like I said In my previous email, can you try this and let me know how
>> many tasks you see?
>>
>> val repRdd = scoredRdd.repartition(50).cache()
>> repRdd.take(1)
>> Then map operation on repRdd here.
>>
>> I’ve done similar map operations in the past and this works.
>>
>> Thanks.
>>
>> On Wed, Jun 9, 2021 at 11:17 AM Tom Barber  wrote:
>>
>>> Also just to follow up on that slightly, I did also try off the back of
>>> another comment:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>>
>>>
>>> Where I repartitioned that scoredRdd map out of interest, it then
>>> triggers the FairFetcher function there, instead of in the runJob(), but
>>> still on a single executor 
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>>

 Okay so what happens is that the crawler reads a bunch of solr data,
 we're not talking GB's just a list of JSON and turns that into a bunch of
 RDD's that end up in that flatmap that I linked to first.

 The fair fetcher is an interface to a pluggable backend that basically
 takes some of the fields and goes and crawls websites listed in them
 looking for information. We wrote this code 6 years ago for a DARPA project
 tracking down criminals on the web. Now I'm reusing it but trying to force
 it to scale out a bit more.

 Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
 to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
 on one node makes my cluster sad) to each executor and have it run a crawl,
 then move on and get another one and so on. That way you're not saturating
 a node trying to look up all of them and you could add more nodes for
 greater capacity pretty quickly. Once the website has been captured, you
 can then "score" it for want of a better term to determine its usefulness,
 which is where the map is being triggered.

 In answer to your questions Sean, no action seems triggered until you
 end up in the score block and the sc.runJob() because thats literally the
 next line of functionality as Kafka isn't enabled.

 val fetchedRdd = rdd.map(r => (r.getGroup, r))
   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
 rs.iterator, localFetchDelay,
 FetchFunction, ParseFunction, OutLinkFilterFunction, 
 StatusUpdateSolrTransformer).toSeq })
   .persist()

 if (kafkaEnable) {
   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
 }
 val scoredRdd = score(fetchedRdd)


 That if block is disabled so the score function runs. Inside of that:

 def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
   val job = this.job.asInstanceOf[SparklerJob]

   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
 ScoreUpdateSolrTransformer(d))
   val scoreUpdateFunc = new SolrStatusUpdate(job)
   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
 


 When its doing stuff in the SparkUI I can see that its waiting on the
 sc.runJob() line, so thats the execution point.


 Tom

 On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:

> persist() doesn't even persist by itself - just sets it to be
> persisted when it's executed.
> key doesn't matter here, nor partitioning, if this code is trying to
> run things on the driver inadvertently.
> I don't quite grok what the OSS code you linked to is doing, but it's
> running some supplied functions very directly and at a low-level with
> sc.runJob, which might be part of how this can do something unusual.

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Sorry Sam, I missed that earlier, I'll give it a spin.


To everyone involved, this code is old, and not written by me. If you all
go "oooh, you want to distribute the crawls over the cluster, you don't
want to do it like that, you should look at XYZ instead" feel free to punt
different ways of doing this across, I'm happy to refactor the code to
modernize it/follow better practices.

On Wed, Jun 9, 2021 at 4:25 PM Sam  wrote:

> Like I said In my previous email, can you try this and let me know how
> many tasks you see?
>
> val repRdd = scoredRdd.repartition(50).cache()
> repRdd.take(1)
> Then map operation on repRdd here.
>
> I’ve done similar map operations in the past and this works.
>
> Thanks.
>
> On Wed, Jun 9, 2021 at 11:17 AM Tom Barber  wrote:
>
>> Also just to follow up on that slightly, I did also try off the back of
>> another comment:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>
>>
>> Where I repartitioned that scoredRdd map out of interest, it then
>> triggers the FairFetcher function there, instead of in the runJob(), but
>> still on a single executor 
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>
>>>
>>> Okay so what happens is that the crawler reads a bunch of solr data,
>>> we're not talking GB's just a list of JSON and turns that into a bunch of
>>> RDD's that end up in that flatmap that I linked to first.
>>>
>>> The fair fetcher is an interface to a pluggable backend that basically
>>> takes some of the fields and goes and crawls websites listed in them
>>> looking for information. We wrote this code 6 years ago for a DARPA project
>>> tracking down criminals on the web. Now I'm reusing it but trying to force
>>> it to scale out a bit more.
>>>
>>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>>> on one node makes my cluster sad) to each executor and have it run a crawl,
>>> then move on and get another one and so on. That way you're not saturating
>>> a node trying to look up all of them and you could add more nodes for
>>> greater capacity pretty quickly. Once the website has been captured, you
>>> can then "score" it for want of a better term to determine its usefulness,
>>> which is where the map is being triggered.
>>>
>>> In answer to your questions Sean, no action seems triggered until you
>>> end up in the score block and the sc.runJob() because thats literally the
>>> next line of functionality as Kafka isn't enabled.
>>>
>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>>> rs.iterator, localFetchDelay,
>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer).toSeq })
>>>   .persist()
>>>
>>> if (kafkaEnable) {
>>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>>> }
>>> val scoredRdd = score(fetchedRdd)
>>>
>>>
>>> That if block is disabled so the score function runs. Inside of that:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>>> ScoreUpdateSolrTransformer(d))
>>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>> 
>>>
>>>
>>> When its doing stuff in the SparkUI I can see that its waiting on the
>>> sc.runJob() line, so thats the execution point.
>>>
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>>>
 persist() doesn't even persist by itself - just sets it to be persisted
 when it's executed.
 key doesn't matter here, nor partitioning, if this code is trying to
 run things on the driver inadvertently.
 I don't quite grok what the OSS code you linked to is doing, but it's
 running some supplied functions very directly and at a low-level with
 sc.runJob, which might be part of how this can do something unusual.
 How do you trigger any action? what happens after persist()

 On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:

> Thanks Mich,
>
> The key on the first iteration is just a string that says "seed", so
> it is indeed on the first crawl the same across all of the groups. Further
> iterations would be different, but I'm not there yet. I was under the
> impression that a repartition would distribute the tasks. Is that not the
> case?
>
> Thanks
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Tom,

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sam
Like I said In my previous email, can you try this and let me know how many
tasks you see?

val repRdd = scoredRdd.repartition(50).cache()
repRdd.take(1)
Then map operation on repRdd here.

I’ve done similar map operations in the past and this works.

Thanks.

On Wed, Jun 9, 2021 at 11:17 AM Tom Barber  wrote:

> Also just to follow up on that slightly, I did also try off the back of
> another comment:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>
>
> Where I repartitioned that scoredRdd map out of interest, it then triggers
> the FairFetcher function there, instead of in the runJob(), but still on a
> single executor 
>
> Tom
>
> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>
>>
>> Okay so what happens is that the crawler reads a bunch of solr data,
>> we're not talking GB's just a list of JSON and turns that into a bunch of
>> RDD's that end up in that flatmap that I linked to first.
>>
>> The fair fetcher is an interface to a pluggable backend that basically
>> takes some of the fields and goes and crawls websites listed in them
>> looking for information. We wrote this code 6 years ago for a DARPA project
>> tracking down criminals on the web. Now I'm reusing it but trying to force
>> it to scale out a bit more.
>>
>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>> on one node makes my cluster sad) to each executor and have it run a crawl,
>> then move on and get another one and so on. That way you're not saturating
>> a node trying to look up all of them and you could add more nodes for
>> greater capacity pretty quickly. Once the website has been captured, you
>> can then "score" it for want of a better term to determine its usefulness,
>> which is where the map is being triggered.
>>
>> In answer to your questions Sean, no action seems triggered until you end
>> up in the score block and the sc.runJob() because thats literally the next
>> line of functionality as Kafka isn't enabled.
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer).toSeq })
>>   .persist()
>>
>> if (kafkaEnable) {
>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>> }
>> val scoredRdd = score(fetchedRdd)
>>
>>
>> That if block is disabled so the score function runs. Inside of that:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>> ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>> 
>>
>>
>> When its doing stuff in the SparkUI I can see that its waiting on the
>> sc.runJob() line, so thats the execution point.
>>
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>>
>>> persist() doesn't even persist by itself - just sets it to be persisted
>>> when it's executed.
>>> key doesn't matter here, nor partitioning, if this code is trying to run
>>> things on the driver inadvertently.
>>> I don't quite grok what the OSS code you linked to is doing, but it's
>>> running some supplied functions very directly and at a low-level with
>>> sc.runJob, which might be part of how this can do something unusual.
>>> How do you trigger any action? what happens after persist()
>>>
>>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:
>>>
 Thanks Mich,

 The key on the first iteration is just a string that says "seed", so it
 is indeed on the first crawl the same across all of the groups. Further
 iterations would be different, but I'm not there yet. I was under the
 impression that a repartition would distribute the tasks. Is that not the
 case?

 Thanks

 Tom

 On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hi Tom,
>
> Persist() here simply means persist to memory). That is all. You can
> check UI tab on storage
>
>
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>
> So I gather the code is stuck from your link in the driver. You stated
> that you tried repartition() but it did not do anything,
>
> Further you stated :
>
> " The key is pretty static in these tests, so I have also tried
> forcing the partition count (50 on a 16 core per node cluster) and also
> 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Also just to follow up on that slightly, I did also try off the back of
another comment:

def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
  val job = this.job.asInstanceOf[SparklerJob]

  val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

  val scoreUpdateRdd: RDD[SolrInputDocument] =
scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))


Where I repartitioned that scoredRdd map out of interest, it then triggers
the FairFetcher function there, instead of in the runJob(), but still on a
single executor 

Tom

On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:

>
> Okay so what happens is that the crawler reads a bunch of solr data, we're
> not talking GB's just a list of JSON and turns that into a bunch of RDD's
> that end up in that flatmap that I linked to first.
>
> The fair fetcher is an interface to a pluggable backend that basically
> takes some of the fields and goes and crawls websites listed in them
> looking for information. We wrote this code 6 years ago for a DARPA project
> tracking down criminals on the web. Now I'm reusing it but trying to force
> it to scale out a bit more.
>
> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want to
> push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel on
> one node makes my cluster sad) to each executor and have it run a crawl,
> then move on and get another one and so on. That way you're not saturating
> a node trying to look up all of them and you could add more nodes for
> greater capacity pretty quickly. Once the website has been captured, you
> can then "score" it for want of a better term to determine its usefulness,
> which is where the map is being triggered.
>
> In answer to your questions Sean, no action seems triggered until you end
> up in the score block and the sc.runJob() because thats literally the next
> line of functionality as Kafka isn't enabled.
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
> FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer).toSeq })
>   .persist()
>
> if (kafkaEnable) {
>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
> }
> val scoredRdd = score(fetchedRdd)
>
>
> That if block is disabled so the score function runs. Inside of that:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
> ScoreUpdateSolrTransformer(d))
>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
> 
>
>
> When its doing stuff in the SparkUI I can see that its waiting on the
> sc.runJob() line, so thats the execution point.
>
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>
>> persist() doesn't even persist by itself - just sets it to be persisted
>> when it's executed.
>> key doesn't matter here, nor partitioning, if this code is trying to run
>> things on the driver inadvertently.
>> I don't quite grok what the OSS code you linked to is doing, but it's
>> running some supplied functions very directly and at a low-level with
>> sc.runJob, which might be part of how this can do something unusual.
>> How do you trigger any action? what happens after persist()
>>
>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:
>>
>>> Thanks Mich,
>>>
>>> The key on the first iteration is just a string that says "seed", so it
>>> is indeed on the first crawl the same across all of the groups. Further
>>> iterations would be different, but I'm not there yet. I was under the
>>> impression that a repartition would distribute the tasks. Is that not the
>>> case?
>>>
>>> Thanks
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hi Tom,

 Persist() here simply means persist to memory). That is all. You can
 check UI tab on storage


 https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

 So I gather the code is stuck from your link in the driver. You stated
 that you tried repartition() but it did not do anything,

 Further you stated :

 " The key is pretty static in these tests, so I have also tried
 forcing the partition count (50 on a 16 core per node cluster) and also
 repartitioning, but every time all the jobs are scheduled to run on one
 node."


 What is the key?


 HTH


view my Linkedin profile
 



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Okay so what happens is that the crawler reads a bunch of solr data, we're
not talking GB's just a list of JSON and turns that into a bunch of RDD's
that end up in that flatmap that I linked to first.

The fair fetcher is an interface to a pluggable backend that basically
takes some of the fields and goes and crawls websites listed in them
looking for information. We wrote this code 6 years ago for a DARPA project
tracking down criminals on the web. Now I'm reusing it but trying to force
it to scale out a bit more.

Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want to
push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel on
one node makes my cluster sad) to each executor and have it run a crawl,
then move on and get another one and so on. That way you're not saturating
a node trying to look up all of them and you could add more nodes for
greater capacity pretty quickly. Once the website has been captured, you
can then "score" it for want of a better term to determine its usefulness,
which is where the map is being triggered.

In answer to your questions Sean, no action seems triggered until you end
up in the score block and the sc.runJob() because thats literally the next
line of functionality as Kafka isn't enabled.

val fetchedRdd = rdd.map(r => (r.getGroup, r))
  .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job,
rs.iterator, localFetchDelay,
FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer).toSeq })
  .persist()

if (kafkaEnable) {
  storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
}
val scoredRdd = score(fetchedRdd)


That if block is disabled so the score function runs. Inside of that:

def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
  val job = this.job.asInstanceOf[SparklerJob]

  val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

  val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d =>
ScoreUpdateSolrTransformer(d))
  val scoreUpdateFunc = new SolrStatusUpdate(job)
  sc.runJob(scoreUpdateRdd, scoreUpdateFunc)



When its doing stuff in the SparkUI I can see that its waiting on the
sc.runJob() line, so thats the execution point.


Tom

On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:

> persist() doesn't even persist by itself - just sets it to be persisted
> when it's executed.
> key doesn't matter here, nor partitioning, if this code is trying to run
> things on the driver inadvertently.
> I don't quite grok what the OSS code you linked to is doing, but it's
> running some supplied functions very directly and at a low-level with
> sc.runJob, which might be part of how this can do something unusual.
> How do you trigger any action? what happens after persist()
>
> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:
>
>> Thanks Mich,
>>
>> The key on the first iteration is just a string that says "seed", so it
>> is indeed on the first crawl the same across all of the groups. Further
>> iterations would be different, but I'm not there yet. I was under the
>> impression that a repartition would distribute the tasks. Is that not the
>> case?
>>
>> Thanks
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi Tom,
>>>
>>> Persist() here simply means persist to memory). That is all. You can
>>> check UI tab on storage
>>>
>>>
>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>>>
>>> So I gather the code is stuck from your link in the driver. You stated
>>> that you tried repartition() but it did not do anything,
>>>
>>> Further you stated :
>>>
>>> " The key is pretty static in these tests, so I have also tried forcing
>>> the partition count (50 on a 16 core per node cluster) and also
>>> repartitioning, but every time all the jobs are scheduled to run on one
>>> node."
>>>
>>>
>>> What is the key?
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 9 Jun 2021 at 15:23, Tom Barber  wrote:
>>>
 Interesting Sean thanks for that insight, I wasn't aware of that fact,
 I assume the .persist() at the end of that line doesn't do it?

 I believe, looking at the output in the SparkUI, it gets to
 https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
 and calls the context runJob.

 On Wed, Jun 9, 2021 at 2:07 PM Sean Owen  wrote:

> All these configurations don't matter at all if this is executing on
> the driver.
> 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sean Owen
persist() doesn't even persist by itself - just sets it to be persisted
when it's executed.
key doesn't matter here, nor partitioning, if this code is trying to run
things on the driver inadvertently.
I don't quite grok what the OSS code you linked to is doing, but it's
running some supplied functions very directly and at a low-level with
sc.runJob, which might be part of how this can do something unusual.
How do you trigger any action? what happens after persist()

On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:

> Thanks Mich,
>
> The key on the first iteration is just a string that says "seed", so it is
> indeed on the first crawl the same across all of the groups. Further
> iterations would be different, but I'm not there yet. I was under the
> impression that a repartition would distribute the tasks. Is that not the
> case?
>
> Thanks
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh 
> wrote:
>
>> Hi Tom,
>>
>> Persist() here simply means persist to memory). That is all. You can
>> check UI tab on storage
>>
>>
>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>>
>> So I gather the code is stuck from your link in the driver. You stated
>> that you tried repartition() but it did not do anything,
>>
>> Further you stated :
>>
>> " The key is pretty static in these tests, so I have also tried forcing
>> the partition count (50 on a 16 core per node cluster) and also
>> repartitioning, but every time all the jobs are scheduled to run on one
>> node."
>>
>>
>> What is the key?
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 9 Jun 2021 at 15:23, Tom Barber  wrote:
>>
>>> Interesting Sean thanks for that insight, I wasn't aware of that fact, I
>>> assume the .persist() at the end of that line doesn't do it?
>>>
>>> I believe, looking at the output in the SparkUI, it gets to
>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
>>> and calls the context runJob.
>>>
>>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen  wrote:
>>>
 All these configurations don't matter at all if this is executing on
 the driver.
 Returning an Iterator in flatMap is fine though it 'delays' execution
 until that iterator is evaluated by something, which is normally fine.
 Does creating this FairFetcher do anything by itself? you're just
 returning an iterator that creates them here.
 How do you actually trigger an action here? the code snippet itself
 doesn't trigger anything.
 I think we need more info about what else is happening in the code.

 On Wed, Jun 9, 2021 at 6:30 AM Tom Barber  wrote:

> Yeah so if I update the FairFetcher to return a seq it makes no real
> difference.
>
> Here's an image of what I'm seeing just for reference:
> https://pasteboard.co/K5NFrz7.png
>
> Because this is databricks I don't have an actual spark submit command
> but it looks like this:
>
> curl  -d
> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
> "spark.task.cpus":"16"},
> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", 
> "10g",
> "--executor-memory", "10g",
> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
> "-tn", "5000", "-co",
> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>
> I deliberately pinned spark.task.cpus to 16 to stop it swamping the
> driver trying to run all the tasks in parallel on the one node, but again
> I've got 50 tasks queued up all running on the single node.
>
> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber  wrote:
>
>> I've not run it yet, but I've stuck a toSeq on the end, but in
>> reality a Seq just inherits Iterator, right?
>>
>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:
>>
>>> 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Thanks Mich,

The key on the first iteration is just a string that says "seed", so it is
indeed on the first crawl the same across all of the groups. Further
iterations would be different, but I'm not there yet. I was under the
impression that a repartition would distribute the tasks. Is that not the
case?

Thanks

Tom

On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh 
wrote:

> Hi Tom,
>
> Persist() here simply means persist to memory). That is all. You can check
> UI tab on storage
>
>
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>
> So I gather the code is stuck from your link in the driver. You stated
> that you tried repartition() but it did not do anything,
>
> Further you stated :
>
> " The key is pretty static in these tests, so I have also tried forcing
> the partition count (50 on a 16 core per node cluster) and also
> repartitioning, but every time all the jobs are scheduled to run on one
> node."
>
>
> What is the key?
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 9 Jun 2021 at 15:23, Tom Barber  wrote:
>
>> Interesting Sean thanks for that insight, I wasn't aware of that fact, I
>> assume the .persist() at the end of that line doesn't do it?
>>
>> I believe, looking at the output in the SparkUI, it gets to
>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
>> and calls the context runJob.
>>
>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen  wrote:
>>
>>> All these configurations don't matter at all if this is executing on the
>>> driver.
>>> Returning an Iterator in flatMap is fine though it 'delays' execution
>>> until that iterator is evaluated by something, which is normally fine.
>>> Does creating this FairFetcher do anything by itself? you're just
>>> returning an iterator that creates them here.
>>> How do you actually trigger an action here? the code snippet itself
>>> doesn't trigger anything.
>>> I think we need more info about what else is happening in the code.
>>>
>>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber  wrote:
>>>
 Yeah so if I update the FairFetcher to return a seq it makes no real
 difference.

 Here's an image of what I'm seeing just for reference:
 https://pasteboard.co/K5NFrz7.png

 Because this is databricks I don't have an actual spark submit command
 but it looks like this:

 curl  -d
 '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
 "spark.task.cpus":"16"},
 "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
 "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
 "--executor-memory", "10g",
 "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
 "-tn", "5000", "-co",
 "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'

 I deliberately pinned spark.task.cpus to 16 to stop it swamping the
 driver trying to run all the tasks in parallel on the one node, but again
 I've got 50 tasks queued up all running on the single node.

 On Wed, Jun 9, 2021 at 12:01 PM Tom Barber  wrote:

> I've not run it yet, but I've stuck a toSeq on the end, but in reality
> a Seq just inherits Iterator, right?
>
> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>
> Tom
>
> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:
>
>> Interesting Jayesh, thanks, I will test.
>>
>> All this code is inherited and it runs, but I don't think its been
>> tested in a distributed context for about 5 years, but yeah I need to get
>> this pushed down, so I'm happy to try anything! :)
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
>> wrote:
>>
>>> flatMap is supposed to return Seq, not Iterator. You are returning a
>>> class that implements Iterator. I have a hunch that's what's causing the
>>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. 
>>> Do
>>> you intend it to be RDD[CrawlData]? You 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Mich Talebzadeh
Hi Tom,

Persist() here simply means persist to memory). That is all. You can check
UI tab on storage

https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

So I gather the code is stuck from your link in the driver. You stated that
you tried repartition() but it did not do anything,

Further you stated :

" The key is pretty static in these tests, so I have also tried forcing the
partition count (50 on a 16 core per node cluster) and also repartitioning,
but every time all the jobs are scheduled to run on one node."


What is the key?


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 9 Jun 2021 at 15:23, Tom Barber  wrote:

> Interesting Sean thanks for that insight, I wasn't aware of that fact, I
> assume the .persist() at the end of that line doesn't do it?
>
> I believe, looking at the output in the SparkUI, it gets to
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
> and calls the context runJob.
>
> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen  wrote:
>
>> All these configurations don't matter at all if this is executing on the
>> driver.
>> Returning an Iterator in flatMap is fine though it 'delays' execution
>> until that iterator is evaluated by something, which is normally fine.
>> Does creating this FairFetcher do anything by itself? you're just
>> returning an iterator that creates them here.
>> How do you actually trigger an action here? the code snippet itself
>> doesn't trigger anything.
>> I think we need more info about what else is happening in the code.
>>
>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber  wrote:
>>
>>> Yeah so if I update the FairFetcher to return a seq it makes no real
>>> difference.
>>>
>>> Here's an image of what I'm seeing just for reference:
>>> https://pasteboard.co/K5NFrz7.png
>>>
>>> Because this is databricks I don't have an actual spark submit command
>>> but it looks like this:
>>>
>>> curl  -d
>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>>> "spark.task.cpus":"16"},
>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
>>> "--executor-memory", "10g",
>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
>>> "-tn", "5000", "-co",
>>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>>>
>>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the
>>> driver trying to run all the tasks in parallel on the one node, but again
>>> I've got 50 tasks queued up all running on the single node.
>>>
>>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber  wrote:
>>>
 I've not run it yet, but I've stuck a toSeq on the end, but in reality
 a Seq just inherits Iterator, right?

 Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.

 Tom

 On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:

> Interesting Jayesh, thanks, I will test.
>
> All this code is inherited and it runs, but I don't think its been
> tested in a distributed context for about 5 years, but yeah I need to get
> this pushed down, so I'm happy to try anything! :)
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
> wrote:
>
>> flatMap is supposed to return Seq, not Iterator. You are returning a
>> class that implements Iterator. I have a hunch that's what's causing the
>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
>> you intend it to be RDD[CrawlData]? You might want to call toSeq on
>> FairFetcher.
>>
>> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>>
>> CAUTION: This email originated from outside of the organization.
>> Do not click links or open attachments unless you can confirm the sender
>> and know the content is safe.
>>
>>
>>
>> For anyone interested here's the execution logs up until the
>> point where it actually kicks off the workload in question:
>> 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Interesting Sean thanks for that insight, I wasn't aware of that fact, I
assume the .persist() at the end of that line doesn't do it?

I believe, looking at the output in the SparkUI, it gets to
https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
and calls the context runJob.

On Wed, Jun 9, 2021 at 2:07 PM Sean Owen  wrote:

> All these configurations don't matter at all if this is executing on the
> driver.
> Returning an Iterator in flatMap is fine though it 'delays' execution
> until that iterator is evaluated by something, which is normally fine.
> Does creating this FairFetcher do anything by itself? you're just
> returning an iterator that creates them here.
> How do you actually trigger an action here? the code snippet itself
> doesn't trigger anything.
> I think we need more info about what else is happening in the code.
>
> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber  wrote:
>
>> Yeah so if I update the FairFetcher to return a seq it makes no real
>> difference.
>>
>> Here's an image of what I'm seeing just for reference:
>> https://pasteboard.co/K5NFrz7.png
>>
>> Because this is databricks I don't have an actual spark submit command
>> but it looks like this:
>>
>> curl  -d
>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>> "spark.task.cpus":"16"},
>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
>> "--executor-memory", "10g",
>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
>> "-tn", "5000", "-co",
>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>>
>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the
>> driver trying to run all the tasks in parallel on the one node, but again
>> I've got 50 tasks queued up all running on the single node.
>>
>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber  wrote:
>>
>>> I've not run it yet, but I've stuck a toSeq on the end, but in reality a
>>> Seq just inherits Iterator, right?
>>>
>>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:
>>>
 Interesting Jayesh, thanks, I will test.

 All this code is inherited and it runs, but I don't think its been
 tested in a distributed context for about 5 years, but yeah I need to get
 this pushed down, so I'm happy to try anything! :)

 Tom

 On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
 wrote:

> flatMap is supposed to return Seq, not Iterator. You are returning a
> class that implements Iterator. I have a hunch that's what's causing the
> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
> you intend it to be RDD[CrawlData]? You might want to call toSeq on
> FairFetcher.
>
> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>
> CAUTION: This email originated from outside of the organization.
> Do not click links or open attachments unless you can confirm the sender
> and know the content is safe.
>
>
>
> For anyone interested here's the execution logs up until the point
> where it actually kicks off the workload in question:
> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>
> On 2021/06/09 01:52:39, Tom Barber 
> wrote:
> > ExecutorID says driver, and looking at the IP addresses its
> running on its not any of the worker ip's.
> >
> > I forcibly told it to create 50, but they'd all end up running
> in the same place.
> >
> > Working on some other ideas, I set spark.task.cpus to 16 to
> match the nodes whilst still forcing it to 50 partitions
> >
> > val m = 50
> >
> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > .groupByKey(m).flatMap({ case (grp, rs) => new
> FairFetcher(job, rs.iterator, localFetchDelay,
> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
> > .persist()
> >
> > that sort of thing. But still the tasks are pinned to the driver
> executor and none of the workers, so I no longer saturate the master node,
> but I also have 3 workers just sat there doing nothing.
> >
> > On 2021/06/09 01:26:50, Sean Owen  wrote:
> > 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Sean Owen
All these configurations don't matter at all if this is executing on the
driver.
Returning an Iterator in flatMap is fine though it 'delays' execution until
that iterator is evaluated by something, which is normally fine.
Does creating this FairFetcher do anything by itself? you're just returning
an iterator that creates them here.
How do you actually trigger an action here? the code snippet itself doesn't
trigger anything.
I think we need more info about what else is happening in the code.

On Wed, Jun 9, 2021 at 6:30 AM Tom Barber  wrote:

> Yeah so if I update the FairFetcher to return a seq it makes no real
> difference.
>
> Here's an image of what I'm seeing just for reference:
> https://pasteboard.co/K5NFrz7.png
>
> Because this is databricks I don't have an actual spark submit command but
> it looks like this:
>
> curl  -d
> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
> "spark.task.cpus":"16"},
> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
> "--executor-memory", "10g",
> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
> "-tn", "5000", "-co",
> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>
> I deliberately pinned spark.task.cpus to 16 to stop it swamping the driver
> trying to run all the tasks in parallel on the one node, but again I've got
> 50 tasks queued up all running on the single node.
>
> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber  wrote:
>
>> I've not run it yet, but I've stuck a toSeq on the end, but in reality a
>> Seq just inherits Iterator, right?
>>
>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:
>>
>>> Interesting Jayesh, thanks, I will test.
>>>
>>> All this code is inherited and it runs, but I don't think its been
>>> tested in a distributed context for about 5 years, but yeah I need to get
>>> this pushed down, so I'm happy to try anything! :)
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
>>> wrote:
>>>
 flatMap is supposed to return Seq, not Iterator. You are returning a
 class that implements Iterator. I have a hunch that's what's causing the
 confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
 you intend it to be RDD[CrawlData]? You might want to call toSeq on
 FairFetcher.

 On 6/8/21, 10:10 PM, "Tom Barber"  wrote:

 CAUTION: This email originated from outside of the organization. Do
 not click links or open attachments unless you can confirm the sender and
 know the content is safe.



 For anyone interested here's the execution logs up until the point
 where it actually kicks off the workload in question:
 https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473

 On 2021/06/09 01:52:39, Tom Barber  wrote:
 > ExecutorID says driver, and looking at the IP addresses its
 running on its not any of the worker ip's.
 >
 > I forcibly told it to create 50, but they'd all end up running in
 the same place.
 >
 > Working on some other ideas, I set spark.task.cpus to 16 to match
 the nodes whilst still forcing it to 50 partitions
 >
 > val m = 50
 >
 > val fetchedRdd = rdd.map(r => (r.getGroup, r))
 > .groupByKey(m).flatMap({ case (grp, rs) => new
 FairFetcher(job, rs.iterator, localFetchDelay,
 >   FetchFunction, ParseFunction, OutLinkFilterFunction,
 StatusUpdateSolrTransformer) })
 > .persist()
 >
 > that sort of thing. But still the tasks are pinned to the driver
 executor and none of the workers, so I no longer saturate the master node,
 but I also have 3 workers just sat there doing nothing.
 >
 > On 2021/06/09 01:26:50, Sean Owen  wrote:
 > > Are you sure it's on the driver? or just 1 executor?
 > > how many partitions does the groupByKey produce? that would
 limit your
 > > parallelism no matter what if it's a small number.
 > >
 > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <
 magicaltr...@apache.org> wrote:
 > >
 > > > Hi folks,
 > > >
 > > > Hopefully someone with more Spark experience than me can
 explain this a
 > > > bit.
 > > >
 > > > I dont' know if this is 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Yeah so if I update the FairFetcher to return a seq it makes no real
difference.

Here's an image of what I'm seeing just for reference:
https://pasteboard.co/K5NFrz7.png

Because this is databricks I don't have an actual spark submit command but
it looks like this:

curl  -d
'{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
"spark.task.cpus":"16"},
"spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
"--executor-memory", "10g",
"--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
"-tn", "5000", "-co",
"{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'

I deliberately pinned spark.task.cpus to 16 to stop it swamping the driver
trying to run all the tasks in parallel on the one node, but again I've got
50 tasks queued up all running on the single node.

On Wed, Jun 9, 2021 at 12:01 PM Tom Barber  wrote:

> I've not run it yet, but I've stuck a toSeq on the end, but in reality a
> Seq just inherits Iterator, right?
>
> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>
> Tom
>
> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:
>
>> Interesting Jayesh, thanks, I will test.
>>
>> All this code is inherited and it runs, but I don't think its been tested
>> in a distributed context for about 5 years, but yeah I need to get this
>> pushed down, so I'm happy to try anything! :)
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
>> wrote:
>>
>>> flatMap is supposed to return Seq, not Iterator. You are returning a
>>> class that implements Iterator. I have a hunch that's what's causing the
>>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
>>> you intend it to be RDD[CrawlData]? You might want to call toSeq on
>>> FairFetcher.
>>>
>>> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>>>
>>> CAUTION: This email originated from outside of the organization. Do
>>> not click links or open attachments unless you can confirm the sender and
>>> know the content is safe.
>>>
>>>
>>>
>>> For anyone interested here's the execution logs up until the point
>>> where it actually kicks off the workload in question:
>>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>>>
>>> On 2021/06/09 01:52:39, Tom Barber  wrote:
>>> > ExecutorID says driver, and looking at the IP addresses its
>>> running on its not any of the worker ip's.
>>> >
>>> > I forcibly told it to create 50, but they'd all end up running in
>>> the same place.
>>> >
>>> > Working on some other ideas, I set spark.task.cpus to 16 to match
>>> the nodes whilst still forcing it to 50 partitions
>>> >
>>> > val m = 50
>>> >
>>> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>> > .groupByKey(m).flatMap({ case (grp, rs) => new
>>> FairFetcher(job, rs.iterator, localFetchDelay,
>>> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
>>> StatusUpdateSolrTransformer) })
>>> > .persist()
>>> >
>>> > that sort of thing. But still the tasks are pinned to the driver
>>> executor and none of the workers, so I no longer saturate the master node,
>>> but I also have 3 workers just sat there doing nothing.
>>> >
>>> > On 2021/06/09 01:26:50, Sean Owen  wrote:
>>> > > Are you sure it's on the driver? or just 1 executor?
>>> > > how many partitions does the groupByKey produce? that would
>>> limit your
>>> > > parallelism no matter what if it's a small number.
>>> > >
>>> > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <
>>> magicaltr...@apache.org> wrote:
>>> > >
>>> > > > Hi folks,
>>> > > >
>>> > > > Hopefully someone with more Spark experience than me can
>>> explain this a
>>> > > > bit.
>>> > > >
>>> > > > I dont' know if this is possible, impossible or just an old
>>> design that
>>> > > > could be better.
>>> > > >
>>> > > > I'm running Sparkler as a spark-submit job on a databricks
>>> spark cluster
>>> > > > and its getting to this point in the code(
>>> > > >
>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
>>> > > > )
>>> > > >
>>> > > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>> > > > .groupByKey()
>>> > > > .flatMap({ case (grp, rs) => new FairFetcher(job,
>>> rs.iterator,
>>> > > > localFetchDelay,
>>> > > 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
I've not run it yet, but I've stuck a toSeq on the end, but in reality a
Seq just inherits Iterator, right?

Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.

Tom

On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:

> Interesting Jayesh, thanks, I will test.
>
> All this code is inherited and it runs, but I don't think its been tested
> in a distributed context for about 5 years, but yeah I need to get this
> pushed down, so I'm happy to try anything! :)
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
> wrote:
>
>> flatMap is supposed to return Seq, not Iterator. You are returning a
>> class that implements Iterator. I have a hunch that's what's causing the
>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
>> you intend it to be RDD[CrawlData]? You might want to call toSeq on
>> FairFetcher.
>>
>> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>>
>> CAUTION: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> For anyone interested here's the execution logs up until the point
>> where it actually kicks off the workload in question:
>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>>
>> On 2021/06/09 01:52:39, Tom Barber  wrote:
>> > ExecutorID says driver, and looking at the IP addresses its running
>> on its not any of the worker ip's.
>> >
>> > I forcibly told it to create 50, but they'd all end up running in
>> the same place.
>> >
>> > Working on some other ideas, I set spark.task.cpus to 16 to match
>> the nodes whilst still forcing it to 50 partitions
>> >
>> > val m = 50
>> >
>> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>> > .groupByKey(m).flatMap({ case (grp, rs) => new
>> FairFetcher(job, rs.iterator, localFetchDelay,
>> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
>> StatusUpdateSolrTransformer) })
>> > .persist()
>> >
>> > that sort of thing. But still the tasks are pinned to the driver
>> executor and none of the workers, so I no longer saturate the master node,
>> but I also have 3 workers just sat there doing nothing.
>> >
>> > On 2021/06/09 01:26:50, Sean Owen  wrote:
>> > > Are you sure it's on the driver? or just 1 executor?
>> > > how many partitions does the groupByKey produce? that would limit
>> your
>> > > parallelism no matter what if it's a small number.
>> > >
>> > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <
>> magicaltr...@apache.org> wrote:
>> > >
>> > > > Hi folks,
>> > > >
>> > > > Hopefully someone with more Spark experience than me can
>> explain this a
>> > > > bit.
>> > > >
>> > > > I dont' know if this is possible, impossible or just an old
>> design that
>> > > > could be better.
>> > > >
>> > > > I'm running Sparkler as a spark-submit job on a databricks
>> spark cluster
>> > > > and its getting to this point in the code(
>> > > >
>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
>> > > > )
>> > > >
>> > > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>> > > > .groupByKey()
>> > > > .flatMap({ case (grp, rs) => new FairFetcher(job,
>> rs.iterator,
>> > > > localFetchDelay,
>> > > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
>> > > > StatusUpdateSolrTransformer) })
>> > > > .persist()
>> > > >
>> > > > This basically takes the RDD and then runs a web based crawl
>> over each RDD
>> > > > and returns the results. But when Spark executes it, it runs
>> all the crawls
>> > > > on the driver node and doesn't distribute them.
>> > > >
>> > > > The key is pretty static in these tests, so I have also tried
>> forcing the
>> > > > partition count (50 on a 16 core per node cluster) and also
>> repartitioning,
>> > > > but every time all the jobs are scheduled to run on one node.
>> > > >
>> > > > What can I do better to distribute the tasks? Because the
>> processing of
>> > > > the data in the RDD isn't the bottleneck, the fetching of the
>> crawl data is
>> > > > the bottleneck, but that happens after the code has been
>> assigned to a node.
>> > > >
>> > > > Thanks
>> > > >
>> > > > Tom
>> > > >
>> > > >
>> > > >
>> -
>> > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> > > >
>> > > >
>> > >
>> >
>> >
>> -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>> >
>>
>> -

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Interesting Jayesh, thanks, I will test.

All this code is inherited and it runs, but I don't think its been tested
in a distributed context for about 5 years, but yeah I need to get this
pushed down, so I'm happy to try anything! :)

Tom

On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh  wrote:

> flatMap is supposed to return Seq, not Iterator. You are returning a class
> that implements Iterator. I have a hunch that's what's causing the
> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
> you intend it to be RDD[CrawlData]? You might want to call toSeq on
> FairFetcher.
>
> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> For anyone interested here's the execution logs up until the point
> where it actually kicks off the workload in question:
> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>
> On 2021/06/09 01:52:39, Tom Barber  wrote:
> > ExecutorID says driver, and looking at the IP addresses its running
> on its not any of the worker ip's.
> >
> > I forcibly told it to create 50, but they'd all end up running in
> the same place.
> >
> > Working on some other ideas, I set spark.task.cpus to 16 to match
> the nodes whilst still forcing it to 50 partitions
> >
> > val m = 50
> >
> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > .groupByKey(m).flatMap({ case (grp, rs) => new
> FairFetcher(job, rs.iterator, localFetchDelay,
> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
> > .persist()
> >
> > that sort of thing. But still the tasks are pinned to the driver
> executor and none of the workers, so I no longer saturate the master node,
> but I also have 3 workers just sat there doing nothing.
> >
> > On 2021/06/09 01:26:50, Sean Owen  wrote:
> > > Are you sure it's on the driver? or just 1 executor?
> > > how many partitions does the groupByKey produce? that would limit
> your
> > > parallelism no matter what if it's a small number.
> > >
> > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber 
> wrote:
> > >
> > > > Hi folks,
> > > >
> > > > Hopefully someone with more Spark experience than me can explain
> this a
> > > > bit.
> > > >
> > > > I dont' know if this is possible, impossible or just an old
> design that
> > > > could be better.
> > > >
> > > > I'm running Sparkler as a spark-submit job on a databricks spark
> cluster
> > > > and its getting to this point in the code(
> > > >
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > > )
> > > >
> > > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > > .groupByKey()
> > > > .flatMap({ case (grp, rs) => new FairFetcher(job,
> rs.iterator,
> > > > localFetchDelay,
> > > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > > StatusUpdateSolrTransformer) })
> > > > .persist()
> > > >
> > > > This basically takes the RDD and then runs a web based crawl
> over each RDD
> > > > and returns the results. But when Spark executes it, it runs all
> the crawls
> > > > on the driver node and doesn't distribute them.
> > > >
> > > > The key is pretty static in these tests, so I have also tried
> forcing the
> > > > partition count (50 on a 16 core per node cluster) and also
> repartitioning,
> > > > but every time all the jobs are scheduled to run on one node.
> > > >
> > > > What can I do better to distribute the tasks? Because the
> processing of
> > > > the data in the RDD isn't the bottleneck, the fetching of the
> crawl data is
> > > > the bottleneck, but that happens after the code has been
> assigned to a node.
> > > >
> > > > Thanks
> > > >
> > > > Tom
> > > >
> > > >
> > > >
> -
> > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > > >
> > > >
> > >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Lalwani, Jayesh
flatMap is supposed to return Seq, not Iterator. You are returning a class that 
implements Iterator. I have a hunch that's what's causing the confusion. 
flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do you intend it to 
be RDD[CrawlData]? You might want to call toSeq on FairFetcher.

On 6/8/21, 10:10 PM, "Tom Barber"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



For anyone interested here's the execution logs up until the point where it 
actually kicks off the workload in question: 
https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473

On 2021/06/09 01:52:39, Tom Barber  wrote:
> ExecutorID says driver, and looking at the IP addresses its running on 
its not any of the worker ip's.
>
> I forcibly told it to create 50, but they'd all end up running in the 
same place.
>
> Working on some other ideas, I set spark.task.cpus to 16 to match the 
nodes whilst still forcing it to 50 partitions
>
> val m = 50
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
rs.iterator, localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction, 
StatusUpdateSolrTransformer) })
> .persist()
>
> that sort of thing. But still the tasks are pinned to the driver executor 
and none of the workers, so I no longer saturate the master node, but I also 
have 3 workers just sat there doing nothing.
>
> On 2021/06/09 01:26:50, Sean Owen  wrote:
> > Are you sure it's on the driver? or just 1 executor?
> > how many partitions does the groupByKey produce? that would limit your
> > parallelism no matter what if it's a small number.
> >
> > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  
wrote:
> >
> > > Hi folks,
> > >
> > > Hopefully someone with more Spark experience than me can explain this 
a
> > > bit.
> > >
> > > I dont' know if this is possible, impossible or just an old design 
that
> > > could be better.
> > >
> > > I'm running Sparkler as a spark-submit job on a databricks spark 
cluster
> > > and its getting to this point in the code(
> > > 
https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > )
> > >
> > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > .groupByKey()
> > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > > localFetchDelay,
> > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > StatusUpdateSolrTransformer) })
> > > .persist()
> > >
> > > This basically takes the RDD and then runs a web based crawl over 
each RDD
> > > and returns the results. But when Spark executes it, it runs all the 
crawls
> > > on the driver node and doesn't distribute them.
> > >
> > > The key is pretty static in these tests, so I have also tried forcing 
the
> > > partition count (50 on a 16 core per node cluster) and also 
repartitioning,
> > > but every time all the jobs are scheduled to run on one node.
> > >
> > > What can I do better to distribute the tasks? Because the processing 
of
> > > the data in the RDD isn't the bottleneck, the fetching of the crawl 
data is
> > > the bottleneck, but that happens after the code has been assigned to 
a node.
> > >
> > > Thanks
> > >
> > > Tom
> > >
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Sean Owen
Really weird. flatMap definitely doesn't happen on the driver. My only
long-shot theory that I haven't thought through is, what is FairFetcher
doing with 'job'? it kind of looks like this is submitting a (driver) Job
directly or something into its scheduler which could be .. something but
maybe that's totally wrong and I'd much more expect that to fail if
executed this way.

On Tue, Jun 8, 2021 at 8:53 PM Tom Barber  wrote:

> ExecutorID says driver, and looking at the IP addresses its running on its
> not any of the worker ip's.
>
> I forcibly told it to create 50, but they'd all end up running in the same
> place.
>
> Working on some other ideas, I set spark.task.cpus to 16 to match the
> nodes whilst still forcing it to 50 partitions
>
> val m = 50
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job,
> rs.iterator, localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
> .persist()
>
> that sort of thing. But still the tasks are pinned to the driver executor
> and none of the workers, so I no longer saturate the master node, but I
> also have 3 workers just sat there doing nothing.
>
> On 2021/06/09 01:26:50, Sean Owen  wrote:
> > Are you sure it's on the driver? or just 1 executor?
> > how many partitions does the groupByKey produce? that would limit your
> > parallelism no matter what if it's a small number.
> >
> > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber 
> wrote:
> >
> > > Hi folks,
> > >
> > > Hopefully someone with more Spark experience than me can explain this a
> > > bit.
> > >
> > > I dont' know if this is possible, impossible or just an old design that
> > > could be better.
> > >
> > > I'm running Sparkler as a spark-submit job on a databricks spark
> cluster
> > > and its getting to this point in the code(
> > >
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > )
> > >
> > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > .groupByKey()
> > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > > localFetchDelay,
> > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > StatusUpdateSolrTransformer) })
> > > .persist()
> > >
> > > This basically takes the RDD and then runs a web based crawl over each
> RDD
> > > and returns the results. But when Spark executes it, it runs all the
> crawls
> > > on the driver node and doesn't distribute them.
> > >
> > > The key is pretty static in these tests, so I have also tried forcing
> the
> > > partition count (50 on a 16 core per node cluster) and also
> repartitioning,
> > > but every time all the jobs are scheduled to run on one node.
> > >
> > > What can I do better to distribute the tasks? Because the processing of
> > > the data in the RDD isn't the bottleneck, the fetching of the crawl
> data is
> > > the bottleneck, but that happens after the code has been assigned to a
> node.
> > >
> > > Thanks
> > >
> > > Tom
> > >
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
For anyone interested here's the execution logs up until the point where it 
actually kicks off the workload in question: 
https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473

On 2021/06/09 01:52:39, Tom Barber  wrote: 
> ExecutorID says driver, and looking at the IP addresses its running on its 
> not any of the worker ip's.
> 
> I forcibly told it to create 50, but they'd all end up running in the same 
> place. 
> 
> Working on some other ideas, I set spark.task.cpus to 16 to match the nodes 
> whilst still forcing it to 50 partitions
> 
> val m = 50
> 
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer) })
> .persist()
> 
> that sort of thing. But still the tasks are pinned to the driver executor and 
> none of the workers, so I no longer saturate the master node, but I also have 
> 3 workers just sat there doing nothing.
> 
> On 2021/06/09 01:26:50, Sean Owen  wrote: 
> > Are you sure it's on the driver? or just 1 executor?
> > how many partitions does the groupByKey produce? that would limit your
> > parallelism no matter what if it's a small number.
> > 
> > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  wrote:
> > 
> > > Hi folks,
> > >
> > > Hopefully someone with more Spark experience than me can explain this a
> > > bit.
> > >
> > > I dont' know if this is possible, impossible or just an old design that
> > > could be better.
> > >
> > > I'm running Sparkler as a spark-submit job on a databricks spark cluster
> > > and its getting to this point in the code(
> > > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > )
> > >
> > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > .groupByKey()
> > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > > localFetchDelay,
> > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > StatusUpdateSolrTransformer) })
> > > .persist()
> > >
> > > This basically takes the RDD and then runs a web based crawl over each RDD
> > > and returns the results. But when Spark executes it, it runs all the 
> > > crawls
> > > on the driver node and doesn't distribute them.
> > >
> > > The key is pretty static in these tests, so I have also tried forcing the
> > > partition count (50 on a 16 core per node cluster) and also 
> > > repartitioning,
> > > but every time all the jobs are scheduled to run on one node.
> > >
> > > What can I do better to distribute the tasks? Because the processing of
> > > the data in the RDD isn't the bottleneck, the fetching of the crawl data 
> > > is
> > > the bottleneck, but that happens after the code has been assigned to a 
> > > node.
> > >
> > > Thanks
> > >
> > > Tom
> > >
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> > 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
ExecutorID says driver, and looking at the IP addresses its running on its not 
any of the worker ip's.

I forcibly told it to create 50, but they'd all end up running in the same 
place. 

Working on some other ideas, I set spark.task.cpus to 16 to match the nodes 
whilst still forcing it to 50 partitions

val m = 50

val fetchedRdd = rdd.map(r => (r.getGroup, r))
.groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
rs.iterator, localFetchDelay,
  FetchFunction, ParseFunction, OutLinkFilterFunction, 
StatusUpdateSolrTransformer) })
.persist()

that sort of thing. But still the tasks are pinned to the driver executor and 
none of the workers, so I no longer saturate the master node, but I also have 3 
workers just sat there doing nothing.

On 2021/06/09 01:26:50, Sean Owen  wrote: 
> Are you sure it's on the driver? or just 1 executor?
> how many partitions does the groupByKey produce? that would limit your
> parallelism no matter what if it's a small number.
> 
> On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  wrote:
> 
> > Hi folks,
> >
> > Hopefully someone with more Spark experience than me can explain this a
> > bit.
> >
> > I dont' know if this is possible, impossible or just an old design that
> > could be better.
> >
> > I'm running Sparkler as a spark-submit job on a databricks spark cluster
> > and its getting to this point in the code(
> > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > )
> >
> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > .groupByKey()
> > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > localFetchDelay,
> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > StatusUpdateSolrTransformer) })
> > .persist()
> >
> > This basically takes the RDD and then runs a web based crawl over each RDD
> > and returns the results. But when Spark executes it, it runs all the crawls
> > on the driver node and doesn't distribute them.
> >
> > The key is pretty static in these tests, so I have also tried forcing the
> > partition count (50 on a 16 core per node cluster) and also repartitioning,
> > but every time all the jobs are scheduled to run on one node.
> >
> > What can I do better to distribute the tasks? Because the processing of
> > the data in the RDD isn't the bottleneck, the fetching of the crawl data is
> > the bottleneck, but that happens after the code has been assigned to a node.
> >
> > Thanks
> >
> > Tom
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Sean Owen
Are you sure it's on the driver? or just 1 executor?
how many partitions does the groupByKey produce? that would limit your
parallelism no matter what if it's a small number.

On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  wrote:

> Hi folks,
>
> Hopefully someone with more Spark experience than me can explain this a
> bit.
>
> I dont' know if this is possible, impossible or just an old design that
> could be better.
>
> I'm running Sparkler as a spark-submit job on a databricks spark cluster
> and its getting to this point in the code(
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> )
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey()
> .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
> .persist()
>
> This basically takes the RDD and then runs a web based crawl over each RDD
> and returns the results. But when Spark executes it, it runs all the crawls
> on the driver node and doesn't distribute them.
>
> The key is pretty static in these tests, so I have also tried forcing the
> partition count (50 on a 16 core per node cluster) and also repartitioning,
> but every time all the jobs are scheduled to run on one node.
>
> What can I do better to distribute the tasks? Because the processing of
> the data in the RDD isn't the bottleneck, the fetching of the crawl data is
> the bottleneck, but that happens after the code has been assigned to a node.
>
> Thanks
>
> Tom
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
Hi folks, 

Hopefully someone with more Spark experience than me can explain this a bit.

I dont' know if this is possible, impossible or just an old design that could 
be better.

I'm running Sparkler as a spark-submit job on a databricks spark cluster and 
its getting to this point in the 
code(https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226)

val fetchedRdd = rdd.map(r => (r.getGroup, r))
.groupByKey()
.flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, 
localFetchDelay,
  FetchFunction, ParseFunction, OutLinkFilterFunction, 
StatusUpdateSolrTransformer) })
.persist()

This basically takes the RDD and then runs a web based crawl over each RDD and 
returns the results. But when Spark executes it, it runs all the crawls on the 
driver node and doesn't distribute them.

The key is pretty static in these tests, so I have also tried forcing the 
partition count (50 on a 16 core per node cluster) and also repartitioning, but 
every time all the jobs are scheduled to run on one node.

What can I do better to distribute the tasks? Because the processing of the 
data in the RDD isn't the bottleneck, the fetching of the crawl data is the 
bottleneck, but that happens after the code has been assigned to a node.

Thanks

Tom


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org