Hi Marek,
That's a great question. The answer depends on whether you are using
portability or the "classic" Runner:
Portability
===
In portability, the SDF functionality includes the option for the Runner
to split a given bundle such that the remaining current bundle's work
will be minimized and the deferred remainder of the work will be
maximized. That would be ideal for checkpointing as soon as possible [1].
Unfortunately, this is not yet implemented in the Flink Runner. That's
why you are seeing the entire split finishing before the checkpoint.
Implementing this would mean to issue the split call upon checkpointing,
making sure to checkpoint the remaining work, and resume it after the
checkpoint has finished.
Perhaps others also could chime in, if there is anything else missing?
Classic
===
AFAIK there is no way to do splitting while processing a split. The best
option would be to create a custom UnboundedSource which creates smaller
splits. The default is to use the parallelism for the number of splits.
Depending on your source this may or may not be trivial.
Cheers,
Max
[1]
https://github.com/apache/beam/blob/6266296ac037afc775735d4f08d25ffcc1a8e897/model/fn-execution/src/main/proto/beam_fn_api.proto#L421
On 07.02.20 12:18, marek-simu...@seznam.cz wrote:
Hi,
I am using FileIO with continuously watching folder for new files to
process. The problem is when flink starts reading 200MB file (around 3M
elements) and also starts checkpointing. Checkpoint never finishes until
WHOLE file is processed.
Minimal example :
https://github.com/seznam/beam/blob/simunek/failingCheckpoint/examples/java/src/main/java/org/apache/beam/examples/CheckpointFailingExample.java
My theory what could be wrong from my understanding :
CheckpointMark in this case starts from Create.ofProvider and then its
propagated to downstream operators where it will be (in queue) behind
all splits, which means all splits have to be read to successfully
checkpoint the operator. The problem is even bigger when there are more
files, then we need to wait for processing all files to successfully
checkpoint.
1. Are my assumption correct?
2. Is there some possibility to improve behavior of SplittableDoFn (or
subsequent reading from BoundedSource) for Flink to better propagate
checkpoint barrier?
For now my fix is reading smaller files (30MB) one by one, by it’s not
very future proof.
Versions:
Beam 2.17
Flink 1.9
Please correct my poor understanding of checkpointing with Beam and
Flink and it would be wonderful if you have some advice what to improve
or where to look.