Hmm, partition + flatten is semantically a no-op (it, at most, may or may not cause the intermediate dataset to be materialized, there are no guarantees) - why do you need the Partition in the first place?
On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson <[email protected]> wrote: > > Answering my own question. > > After a partition you "flatten" it as so (in spark this then runs on the > partitions in parallel tasks) : > > PCollection<SolrInputDocument> inputDocs = > partitioned.apply(Flatten.pCollections()).apply( > "Convert to SOLR", ParDo.of(new ParseToSOLRDoc())); > > > > > On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson <[email protected]> > wrote: > >> Hi folks, >> >> I feel a little daft asking this, and suspect I am missing the obvious... >> >> Can someone please tell me how I can do a ParDo following a Partition? >> >> In spark I'd just repartition(...) and then a map() but I don't spot in >> the Beam API how to run a ParDo on each partition in parallel. Do I need >> to multithread manually? >> >> I tried this: >> >> PCollectionList<UntypedOccurrence> partitioned = >> verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); >> >> // does not run in parallel on spark... >> >> for (PCollection<UntypedOccurrence> untyped : partitioned.getAll()) { >> PCollection<SolrInputDocument> inputDocs = partitioned.get(untyped).apply( >> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); >> >> inputDocs.apply(SolrIO.write().to("solr-load" >> ).withConnectionConfiguration(conn)); >> } >> >> >> [For background: I'm using a non splittable custom Hadoop InputFormat >> which means I end up with an RDD as a single partition, and need to split >> it to run expensive op in parallel] >> >> Thanks, >> Tim >> >> >> >> >> >> >
