Just to make sure I'm understanding the question:

You have some input which cannot be split, and you want to read it all in,
redistribute all the records across your available workers, and perform
some processing on those records in parallel, yes?

I think there's some confusion here about the Beam concept of `Partition`
and the Spark concept. In Beam, the Partition transform relates to the
*logical* concept of a PCollection, which is unrelated to the
*physical *materialization
of that PCollection. In Spark, from my high-level reading, it looks like a
partition relates to some physical distribution of the data. Beam doesn't
provide direct access to controlling the partitioning of the data -
generally, runners should try to distribute the data evenly among all
available workers; processing will be performed in parallel across those
workers.

If my understanding of the question is accurate, if you apply  `
Reshuffle.viaRandomKey()` to your input (`verbatimRecords`). This should
redistribute the data arbitrarily across all of your workers/partitions.
Downstream processing should then be performed in parallel across your data
without the need for any additional explicit configuration.


On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov <[email protected]>
wrote:

> Hmm, partition + flatten is semantically a no-op (it, at most, may or may
> not cause the intermediate dataset to be materialized, there are no
> guarantees) - why do you need the Partition in the first place?
>
> On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson <[email protected]>
> wrote:
>
>>
>> Answering my own question.
>>
>> After a partition you "flatten" it as so (in spark this then runs on the
>> partitions in parallel tasks) :
>>
>> PCollection<SolrInputDocument> inputDocs = 
>> partitioned.apply(Flatten.pCollections()).apply(
>>   "Convert to SOLR", ParDo.of(new ParseToSOLRDoc()));
>>
>>
>>
>>
>> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson <[email protected]>
>> wrote:
>>
>>> Hi folks,
>>>
>>> I feel a little daft asking this, and suspect I am missing the obvious...
>>>
>>> Can someone please tell me how I can do a ParDo following a Partition?
>>>
>>> In spark I'd just repartition(...) and then a map() but I don't spot in
>>> the Beam API how to run a ParDo on each partition in parallel.  Do I need
>>> to multithread manually?
>>>
>>> I tried this:
>>>
>>> PCollectionList<UntypedOccurrence> partitioned = 
>>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));
>>>
>>> // does not run in parallel on spark...
>>>
>>> for (PCollection<UntypedOccurrence> untyped : partitioned.getAll()) {
>>> PCollection<SolrInputDocument> inputDocs = partitioned.get(untyped).
>>> apply(
>>> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));
>>>
>>> inputDocs.apply(SolrIO.write().to("solr-load").
>>> withConnectionConfiguration(conn));
>>> }
>>>
>>>
>>> [For background: I'm using a non splittable custom Hadoop InputFormat
>>> which means I end up with an RDD as a single partition, and need to split
>>> it to run expensive op in parallel]
>>>
>>> Thanks,
>>> Tim
>>>
>>>
>>>
>>>
>>>
>>>
>>

Reply via email to