大家好,

我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢
1. 启动checkpoint
2. 设置statebackend为FsStateBackend
3. 从socketTextStream读取数据,统计单词个数
    (“hello”, 5), (“world”, 1)
4. 通过触发异常,来模拟终止程序
5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值
    (“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello”, 6)
    而在实际输出结果为(“hello”, 1)

环境和版本信息
1. MacOS - Oracle JDK 1.8
2. 版本信息
<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <flink.version>1.11.6</flink.version>
   <flink.lang>scala</flink.lang>
   <target.java.version>1.8</target.java.version>
   <scala.binary.version>2.12</scala.binary.version>
   <maven.compiler.source>${target.java.version}</maven.compiler.source>
   <maven.compiler.target>${target.java.version}</maven.compiler.target>
   <log4j.version>2.12.1</log4j.version>
</properties>

代码
object RestartStrategyFsStateBackend {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(1000L) 
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2))

    val backendPath = 
"file:///Users/arthur/Documents/Workspace/java/quickstart" +
      "/flink-spring/src/main/resources/backend.out/restartstrategyv3"
    env.setStateBackend(new FsStateBackend(backendPath))

    // socket数据源
    env.socketTextStream("localhost", 7077)
      .map(value => {
        if (value == "restart") {
          throw new RuntimeException("restart is triggered, oooops~~~~~")
        }
        (value, 1)
      }
      )
      .keyBy(_._1)
      .sum(1)
      .print("RestartStrategy")

    env.execute("RestartStrategy")
  }
}

BR.
Arthur





 

回复