flatMap is supposed to return Seq, not Iterator. You are returning a class that 
implements Iterator. I have a hunch that's what's causing the confusion. 
flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do you intend it to 
be RDD[CrawlData]? You might want to call toSeq on FairFetcher.

On 6/8/21, 10:10 PM, "Tom Barber" <magicaltr...@apache.org> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



    For anyone interested here's the execution logs up until the point where it 
actually kicks off the workload in question: 
https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473

    On 2021/06/09 01:52:39, Tom Barber <magicaltr...@apache.org> wrote:
    > ExecutorID says driver, and looking at the IP addresses its running on 
its not any of the worker ip's.
    >
    > I forcibly told it to create 50, but they'd all end up running in the 
same place.
    >
    > Working on some other ideas, I set spark.task.cpus to 16 to match the 
nodes whilst still forcing it to 50 partitions
    >
    > val m = 50
    >
    > val fetchedRdd = rdd.map(r => (r.getGroup, r))
    >         .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
rs.iterator, localFetchDelay,
    >           FetchFunction, ParseFunction, OutLinkFilterFunction, 
StatusUpdateSolrTransformer) })
    >         .persist()
    >
    > that sort of thing. But still the tasks are pinned to the driver executor 
and none of the workers, so I no longer saturate the master node, but I also 
have 3 workers just sat there doing nothing.
    >
    > On 2021/06/09 01:26:50, Sean Owen <sro...@gmail.com> wrote:
    > > Are you sure it's on the driver? or just 1 executor?
    > > how many partitions does the groupByKey produce? that would limit your
    > > parallelism no matter what if it's a small number.
    > >
    > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <magicaltr...@apache.org> 
wrote:
    > >
    > > > Hi folks,
    > > >
    > > > Hopefully someone with more Spark experience than me can explain this 
a
    > > > bit.
    > > >
    > > > I dont' know if this is possible, impossible or just an old design 
that
    > > > could be better.
    > > >
    > > > I'm running Sparkler as a spark-submit job on a databricks spark 
cluster
    > > > and its getting to this point in the code(
    > > > 
https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
    > > > )
    > > >
    > > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
    > > >         .groupByKey()
    > > >         .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
    > > > localFetchDelay,
    > > >           FetchFunction, ParseFunction, OutLinkFilterFunction,
    > > > StatusUpdateSolrTransformer) })
    > > >         .persist()
    > > >
    > > > This basically takes the RDD and then runs a web based crawl over 
each RDD
    > > > and returns the results. But when Spark executes it, it runs all the 
crawls
    > > > on the driver node and doesn't distribute them.
    > > >
    > > > The key is pretty static in these tests, so I have also tried forcing 
the
    > > > partition count (50 on a 16 core per node cluster) and also 
repartitioning,
    > > > but every time all the jobs are scheduled to run on one node.
    > > >
    > > > What can I do better to distribute the tasks? Because the processing 
of
    > > > the data in the RDD isn't the bottleneck, the fetching of the crawl 
data is
    > > > the bottleneck, but that happens after the code has been assigned to 
a node.
    > > >
    > > > Thanks
    > > >
    > > > Tom
    > > >
    > > >
    > > > ---------------------------------------------------------------------
    > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
    > > >
    > > >
    > >
    >
    > ---------------------------------------------------------------------
    > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
    >
    >

    ---------------------------------------------------------------------
    To unsubscribe e-mail: user-unsubscr...@spark.apache.org



---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to