Thanks everyone. Very informative answers and Redistribute / Reshuffle is what 
I missed. 

Might it help adding a table in the docs which summarises the beam 
transformations and their equivalents in Spark, Apex, Flink etc? I'd be happy 
to contribute.

Answering "why a repartition?". Two uses cases I've had in Hadoop and Spark are 
1) inputformats on hadoop that aren't splittable (e.g. Zip files) and 2) 
partitioning for data locality for a fast ingest into e.g HBase (see 
http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/).
 In this case I'm reading zip files residing on HDFS which is inherently a 
single threaded op but then need to run expensive operations on the resulting 
dataset in parallel.

Thanks,
Tim,
Sent from my iPhone 

> On 3 Oct 2017, at 18:16, Thomas Groh <[email protected]> wrote:
> 
> Just to make sure I'm understanding the question:
> 
> You have some input which cannot be split, and you want to read it all in, 
> redistribute all the records across your available workers, and perform some 
> processing on those records in parallel, yes?
> 
> I think there's some confusion here about the Beam concept of `Partition` and 
> the Spark concept. In Beam, the Partition transform relates to the logical 
> concept of a PCollection, which is unrelated to the physical materialization 
> of that PCollection. In Spark, from my high-level reading, it looks like a 
> partition relates to some physical distribution of the data. Beam doesn't 
> provide direct access to controlling the partitioning of the data - 
> generally, runners should try to distribute the data evenly among all 
> available workers; processing will be performed in parallel across those 
> workers.
> 
> If my understanding of the question is accurate, if you apply  
> `Reshuffle.viaRandomKey()` to your input (`verbatimRecords`). This should 
> redistribute the data arbitrarily across all of your workers/partitions. 
> Downstream processing should then be performed in parallel across your data 
> without the need for any additional explicit configuration.
> 
> 
>> On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov <[email protected]> 
>> wrote:
>> Hmm, partition + flatten is semantically a no-op (it, at most, may or may 
>> not cause the intermediate dataset to be materialized, there are no 
>> guarantees) - why do you need the Partition in the first place?
>> 
>>> On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson <[email protected]> 
>>> wrote:
>>> 
>>> Answering my own question.
>>> 
>>> After a partition you "flatten" it as so (in spark this then runs on the 
>>> partitions in parallel tasks) :
>>> PCollection<SolrInputDocument> inputDocs = 
>>> partitioned.apply(Flatten.pCollections()).apply(
>>>   "Convert to SOLR", ParDo.of(new ParseToSOLRDoc()));
>>> 
>>> 
>>> 
>>>> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson <[email protected]> 
>>>> wrote:
>>>> Hi folks,
>>>> 
>>>> I feel a little daft asking this, and suspect I am missing the obvious...
>>>> 
>>>> Can someone please tell me how I can do a ParDo following a Partition?
>>>> 
>>>> In spark I'd just repartition(...) and then a map() but I don't spot in 
>>>> the Beam API how to run a ParDo on each partition in parallel.  Do I need 
>>>> to multithread manually?
>>>> 
>>>> I tried this:
>>>> PCollectionList<UntypedOccurrence> partitioned = 
>>>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));
>>>> // does not run in parallel on spark...
>>>> for (PCollection<UntypedOccurrence> untyped : partitioned.getAll()) {
>>>>   PCollection<SolrInputDocument> inputDocs = 
>>>> partitioned.get(untyped).apply(
>>>>     "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));
>>>> 
>>>>   
>>>> inputDocs.apply(SolrIO.write().to("solr-load").withConnectionConfiguration(conn));
>>>> }
>>>> 
>>>> [For background: I'm using a non splittable custom Hadoop InputFormat 
>>>> which means I end up with an RDD as a single partition, and need to split 
>>>> it to run expensive op in parallel]
>>>> 
>>>> Thanks,
>>>> Tim
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
> 

Reply via email to