Hi guys, got one more question for you, maybe someone already implemented such 
feature or found a good technique.

I wrote an IT, that runs a flink job, that reads data from kafka topic, and 
flushes it onto fs using BucketingSink.
I implemented some custom logic, that fires on notifyCheckpointComplete and 
would like to test it, so I need to lock job somehow and wait till 
checkpointing is performed.

The first idea, that I’ve implemented, is to specify checkpointing interval in 
a 1 second, and extend logic of test source to wait for a few seconds, when all 
test messages will be send to sink.
The code looks something like this:
public class SleepingCollectionInputFormat<T> extends CollectionInputFormat<T> {

    private static final long serialVersionUID = -5957191172818298164L;

    private final long duration;

    public SleepingCollectionInputFormat(Collection<T> dataSet, 
TypeSerializer<T> serializer, long duration) {
        super(dataSet, serializer);
        this.duration = duration;
    }

    @Override
    public boolean reachedEnd() throws IOException {
        try {
            boolean reached = super.reachedEnd();
            if (reached) {
                Thread.sleep(duration);
            }
            return reached;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
This is not good enough, because we couldn’t provide any guarantees, that 
everything is properly locked and checkpointing is be called.
As for me, much better approach will be to send some kind of notification to 
task manager, when all items from source were sent, that it’s time to perform 
checkpointing.
Maybe someone knows, how such feature could be implemented ?

Another one thing, that could be implemented is to use CountDownLatch, count on 
notify checkpointing and await in the end of source function, but serialization 
makes it to complicated for me now.

I’ll be very pleasant for your replies, answers and recommendations.

Thx ! 




Reply via email to