Thanks everyone. Very informative answers and Redistribute / Reshuffle is what I missed.
Might it help adding a table in the docs which summarises the beam transformations and their equivalents in Spark, Apex, Flink etc? I'd be happy to contribute. Answering "why a repartition?". Two uses cases I've had in Hadoop and Spark are 1) inputformats on hadoop that aren't splittable (e.g. Zip files) and 2) partitioning for data locality for a fast ingest into e.g HBase (see http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/). In this case I'm reading zip files residing on HDFS which is inherently a single threaded op but then need to run expensive operations on the resulting dataset in parallel. Thanks, Tim, Sent from my iPhone > On 3 Oct 2017, at 18:16, Thomas Groh <[email protected]> wrote: > > Just to make sure I'm understanding the question: > > You have some input which cannot be split, and you want to read it all in, > redistribute all the records across your available workers, and perform some > processing on those records in parallel, yes? > > I think there's some confusion here about the Beam concept of `Partition` and > the Spark concept. In Beam, the Partition transform relates to the logical > concept of a PCollection, which is unrelated to the physical materialization > of that PCollection. In Spark, from my high-level reading, it looks like a > partition relates to some physical distribution of the data. Beam doesn't > provide direct access to controlling the partitioning of the data - > generally, runners should try to distribute the data evenly among all > available workers; processing will be performed in parallel across those > workers. > > If my understanding of the question is accurate, if you apply > `Reshuffle.viaRandomKey()` to your input (`verbatimRecords`). This should > redistribute the data arbitrarily across all of your workers/partitions. > Downstream processing should then be performed in parallel across your data > without the need for any additional explicit configuration. > > >> On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov <[email protected]> >> wrote: >> Hmm, partition + flatten is semantically a no-op (it, at most, may or may >> not cause the intermediate dataset to be materialized, there are no >> guarantees) - why do you need the Partition in the first place? >> >>> On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson <[email protected]> >>> wrote: >>> >>> Answering my own question. >>> >>> After a partition you "flatten" it as so (in spark this then runs on the >>> partitions in parallel tasks) : >>> PCollection<SolrInputDocument> inputDocs = >>> partitioned.apply(Flatten.pCollections()).apply( >>> "Convert to SOLR", ParDo.of(new ParseToSOLRDoc())); >>> >>> >>> >>>> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson <[email protected]> >>>> wrote: >>>> Hi folks, >>>> >>>> I feel a little daft asking this, and suspect I am missing the obvious... >>>> >>>> Can someone please tell me how I can do a ParDo following a Partition? >>>> >>>> In spark I'd just repartition(...) and then a map() but I don't spot in >>>> the Beam API how to run a ParDo on each partition in parallel. Do I need >>>> to multithread manually? >>>> >>>> I tried this: >>>> PCollectionList<UntypedOccurrence> partitioned = >>>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); >>>> // does not run in parallel on spark... >>>> for (PCollection<UntypedOccurrence> untyped : partitioned.getAll()) { >>>> PCollection<SolrInputDocument> inputDocs = >>>> partitioned.get(untyped).apply( >>>> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); >>>> >>>> >>>> inputDocs.apply(SolrIO.write().to("solr-load").withConnectionConfiguration(conn)); >>>> } >>>> >>>> [For background: I'm using a non splittable custom Hadoop InputFormat >>>> which means I end up with an RDD as a single partition, and need to split >>>> it to run expensive op in parallel] >>>> >>>> Thanks, >>>> Tim >>>> >>>> >>>> >>>> >>>> >>> >
