I am trying to understand the Flink design pattern for consuming files from
S3 continuously as they appear. I have written the below minimal program to
do that and it works as expected wrt detecting newly-uploaded S3 files
within the configured 5 second monitoring poll period. Then it just prints
the file contents to stdout as a simple task.

Where I am having difficulty is understanding the design pattern to
maintain state so that upon restart, the Flink app will NOT reprocess files
that it already processed.

I can see that Flink is retaining state in my configured state location at
file:///var/tmp/flink/checkpoints/ and when inspecting state files at paths
like
/var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata,
I see that the S3 path to the files already processed by Flink show up in
the _metadata file. So, I know the state of each file is being captured.

Now I am trying to understand a few concepts;

   1. How much state will Flink retain? If the files in the bucket are
   retained for a long time then there could be a lot of files piling up in
   the bucket with say, a life cycle delete policy of 30 days. It seems that
   Flink would have to retain the complete list to be able to avoid
   reprocessing existing files and that would be quite a lot of state.
   2. I understand from the docs that you can restart Flink using state
   from either a savepoint or a checkpoint. I was trying to restart my test
   application standalone using the following command from my dev environment
   but, upon startup, it still reprocesses the files that are in the _metadata
   state captured from the previous run. Is the "--fromSavepoint" option
   the correct way to specify the savepoint file to be read at startup?

/usr/bin/env \
ASP_USE_LOCAL_ENV=1 \
ASP_VERBOSE_LOGGING=true \
ASP_CHECKPOINT_INTERVAL_MILLISECONDS=5000 \
/usr/lib/jvm/java-11-openjdk-amd64/bin/java \
@./startup.argfile \
com.myapp.weatherstreamprocessor.WeatherStreamProcessor \
--fromSavepoint
/var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata

I am using the Flink operator to deploy my Flink application to EKS and
already have one production Flink application that is consuming from and
writing to Kinesis, so I have some initial Flink experience doing that. So,
I realize that, when deployed in my EKS cluster, checkpointing is meant for
recovery of the task managers by the job manager should the task managers
need to be restarted. And, for managed restarts (like code updates), I
should be using an explicitly created savepoint. But I am just trying to
prove the behavior in my test environment.

Could someone kindly direct me to the right approach to be able to restart
in my test environment, read the checkpoint, and NOT have Flink reprocesses
the files already seen in the previous running instance?

Thanks!




#
==========================================================================================
# My Test Application
#
==========================================================================================

package com.myapp.weatherstreamprocessor;

import java.time.Duration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WeatherStreamProcessor {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        conf.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true);
        conf.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024
^ 3));
        conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024
^ 3));
        conf.set(TaskManagerOptions.CPU_CORES, 4.0);

        final StreamExecutionEnvironment env;
        Config appCfg = new Config();

        if (appCfg.getUseLocalEnv()) {
            env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        } else {
            env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        }

        env.setParallelism(appCfg.getParallelism());
        if (appCfg.getCheckpointInterval() > 0) {
            env.enableCheckpointing(appCfg.getCheckpointInterval());
        }
        CheckpointConfig config = env.getCheckpointConfig();

config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        config.setCheckpointStorage("file:///var/tmp/flink/checkpoints/");

        SetupJob(env, appCfg);
        env.execute("Weather Stream Processor");
    }

    static void SetupJob(StreamExecutionEnvironment env, Config appCfg) {
        final FileSource<String> source =
                FileSource.forRecordStreamFormat(new TextLineInputFormat(),
new Path("s3://stats-staging/"))
                        .monitorContinuously(Duration.ofSeconds(5L))
                        .build();
        final DataStream<String> stream =
                env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
        stream.print();
    }
}

Reply via email to