Beam's partition concerns itself only with the logical splitting of a
PCollection into a number of subsets, unlike Spark's repartition which
controls the physical distribution of the data. What you probably want
instead is Redistribute which redistributes the work (randomly) across
all available workers. If you care about which values are colocated on
which workers, you could key by the desired locality, do a GroupByKey,
and then drop the key (noting that things would also be grouped by
window in this case).

On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov <[email protected]> wrote:
> Hmm, partition + flatten is semantically a no-op (it, at most, may or may
> not cause the intermediate dataset to be materialized, there are no
> guarantees) - why do you need the Partition in the first place?
>
> On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson <[email protected]>
> wrote:
>>
>>
>> Answering my own question.
>>
>> After a partition you "flatten" it as so (in spark this then runs on the
>> partitions in parallel tasks) :
>>
>> PCollection<SolrInputDocument> inputDocs =
>> partitioned.apply(Flatten.pCollections()).apply(
>>   "Convert to SOLR", ParDo.of(new ParseToSOLRDoc()));
>>
>>
>>
>>
>> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson <[email protected]>
>> wrote:
>>>
>>> Hi folks,
>>>
>>> I feel a little daft asking this, and suspect I am missing the obvious...
>>>
>>> Can someone please tell me how I can do a ParDo following a Partition?
>>>
>>> In spark I'd just repartition(...) and then a map() but I don't spot in
>>> the Beam API how to run a ParDo on each partition in parallel.  Do I need to
>>> multithread manually?
>>>
>>> I tried this:
>>>
>>> PCollectionList<UntypedOccurrence> partitioned =
>>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));
>>>
>>> // does not run in parallel on spark...
>>>
>>> for (PCollection<UntypedOccurrence> untyped : partitioned.getAll()) {
>>>   PCollection<SolrInputDocument> inputDocs =
>>> partitioned.get(untyped).apply(
>>>     "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));
>>>
>>>
>>> inputDocs.apply(SolrIO.write().to("solr-load").withConnectionConfiguration(conn));
>>> }
>>>
>>>
>>> [For background: I'm using a non splittable custom Hadoop InputFormat
>>> which means I end up with an RDD as a single partition, and need to split it
>>> to run expensive op in parallel]
>>>
>>> Thanks,
>>> Tim
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to