Hi everyone,

I am trying to set up flink with a hdfs state backend.  I configured
state.backend and state.backend.fs.checkpointdir parameters in the
flink-conf.yaml.  I run the flink task and the checkpoint directories are
created in hdfs, so it appears it can connect and talk to hdfs just fine.
Unfortunately no files are ever created in the hdfs directory.  I checked
that the state is being saved and restored from the task manager memory and
that works fine, it just never writes to hdfs.

Am I missing a step?  Do I need to do anything to force a write to hdfs?
Does the state variable have to be a particular type to work with hdfs?

This is what my snapshot functions look like:

  override def restoreState (rState:
scala.collection.mutable.HashMap[String, String]): Unit = {    state =
rState  }  override def snapshotState(checkpointId: Long,
checkpointTimestamp: Long): scala.collection.mutable.HashMap[String,
String] = {    state  }


Thanks!
-Jason

P.S.  I am running Flink v1.0.1, Hadoop 2.7.1, and Scala 2.11

Reply via email to