Answering my own question. After a partition you "flatten" it as so (in spark this then runs on the partitions in parallel tasks) :
PCollection<SolrInputDocument> inputDocs = partitioned.apply(Flatten.pCollections()).apply( "Convert to SOLR", ParDo.of(new ParseToSOLRDoc())); On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson <[email protected]> wrote: > Hi folks, > > I feel a little daft asking this, and suspect I am missing the obvious... > > Can someone please tell me how I can do a ParDo following a Partition? > > In spark I'd just repartition(...) and then a map() but I don't spot in > the Beam API how to run a ParDo on each partition in parallel. Do I need > to multithread manually? > > I tried this: > > PCollectionList<UntypedOccurrence> partitioned = > verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); > > // does not run in parallel on spark... > > for (PCollection<UntypedOccurrence> untyped : partitioned.getAll()) { > PCollection<SolrInputDocument> inputDocs = partitioned.get(untyped).apply( > "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); > > inputDocs.apply(SolrIO.write().to("solr-load"). > withConnectionConfiguration(conn)); > } > > > [For background: I'm using a non splittable custom Hadoop InputFormat > which means I end up with an RDD as a single partition, and need to split > it to run expensive op in parallel] > > Thanks, > Tim > > > > > >
