Hmm, partition + flatten is semantically a no-op (it, at most, may or may
not cause the intermediate dataset to be materialized, there are no
guarantees) - why do you need the Partition in the first place?

On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson <[email protected]>
wrote:

>
> Answering my own question.
>
> After a partition you "flatten" it as so (in spark this then runs on the
> partitions in parallel tasks) :
>
> PCollection<SolrInputDocument> inputDocs = 
> partitioned.apply(Flatten.pCollections()).apply(
>   "Convert to SOLR", ParDo.of(new ParseToSOLRDoc()));
>
>
>
>
> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson <[email protected]>
> wrote:
>
>> Hi folks,
>>
>> I feel a little daft asking this, and suspect I am missing the obvious...
>>
>> Can someone please tell me how I can do a ParDo following a Partition?
>>
>> In spark I'd just repartition(...) and then a map() but I don't spot in
>> the Beam API how to run a ParDo on each partition in parallel.  Do I need
>> to multithread manually?
>>
>> I tried this:
>>
>> PCollectionList<UntypedOccurrence> partitioned = 
>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));
>>
>> // does not run in parallel on spark...
>>
>> for (PCollection<UntypedOccurrence> untyped : partitioned.getAll()) {
>> PCollection<SolrInputDocument> inputDocs = partitioned.get(untyped).apply(
>> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));
>>
>> inputDocs.apply(SolrIO.write().to("solr-load"
>> ).withConnectionConfiguration(conn));
>> }
>>
>>
>> [For background: I'm using a non splittable custom Hadoop InputFormat
>> which means I end up with an RDD as a single partition, and need to split
>> it to run expensive op in parallel]
>>
>> Thanks,
>> Tim
>>
>>
>>
>>
>>
>>
>

Reply via email to