Hey there,
According to the docs, when using a FileBasedSource or a splittable
DoFn, the runner is free to initiate splits that can be run in parallel.
As far as I can tell, the former is actually happening on my Apache
Flink cluster, but the latter is not. This causes a single Taskmanager
to process all splits of an input text file. Is this known behaviour and
how can I fix this?
I have a pipeline that looks like this (Python SDK):
(pipeline
| 'Read Input File' >> textio.ReadFromText(input_glob, min_bundle_size=1)
| 'Reshuffle Lines' >> beam.Reshuffle()
| 'Map Records' >> beam.ParDo(map_func))
The input file is a large, uncompressed plaintext file from a shared
drive containing millions of newline-separated data records. I am
running this job with a parallelism of 100, but it is bottlenecked by a
single worker running ReadFromText(). The reshuffling in between was
added to force Beam/Flink to parallelize the processing, but this has no
effect on the preceding stage. Only the following map operation is being
run in parallel. The stage itself is marked as having a parallelism of
100, but 99 workers finish immediately.
I had the same issue earlier with another input source, in which I match
a bunch of WARC file globs and then iterate over them in a splittable
DoFn. I solved the missing parallelism by adding an explicit reshuffle
in between matching input globs and actually reading the individual files:
class WarcInput(beam.PTransform):
def expand(self, pcoll):
return pcoll | MatchFiles(self._file_pattern) |
beam.Reshuffle() | beam.ParDo(WarcReader())
This way I can at least achieve parallelism on file level. This doesn't
work with a single splittable input file, of course, for which I would
have to reshuffle somewhere inside of ReadFromText(). Do I really have
to write a custom PTransform that generates initial splits, shuffles
them, and then reads from those splits? I consider this somewhat
essential functionality.
Any hints appreciated.
Janek