Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].
Could you specify which version of Spark and Beam you use? [1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838 <https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838> > On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <[email protected]> wrote: > > Hello, > > We are developing a beam pipeline which runs on SparkRunner on streaming > mode. This pipeline read from Kinesis, do some translations, filtering and > finally output to S3 using AvroIO writer. We are using Fixed windows with > triggers based on element count and processing time intervals. Outputs path > is partitioned by window start timestamp. allowedLateness=0sec > > This is working fine, but I have noticed that whenever we restarts streaming, > application is starting to read from Kinesis TRIM_HORIZON. That is, it is not > resuming from last checkpoint position. Then I found that the checkpoint > directory is based on --jobName and --checkpointDir properties. So I tried > running as below: > > spark-submit --master yarn --deploy-mode cluster --conf > spark.dynamicAllocation.enabled=false \ > --driver-memory 1g --executor-memory 1g --num-executors 1 > --executor-cores 1 \ > --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \ > --conf spark.executor.extraClassPath=/etc/hbase/conf \ > /tmp/stream-processor-0.0.0.8-spark.jar \ > --runner=SparkRunner \ > --jobName=PrimeStreamProcessor \ > --checkpointDir=hdfs:///tmp/PrimeStreamProcessor > <hdfs:///tmp/PrimeStreamProcessor> checkpoint \ > --useWindow=true \ > --windowDuration=60s --windowLateness=0s --windowElementCount=1 \ > --maxReadTime=-1 \ > --streaming=true > > I can see that it is able to fetch checkpoint data from checkpointDir path > provided. But When the driver tries to broadcast this information to > executors, it is failing with below exception. > 20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User > class threw exception: > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.UnsupportedOperationException: Accumulator must be registered > before send to executor > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71) > at > org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44) > .... > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678) > Caused by: java.lang.UnsupportedOperationException: Accumulator must be > registered before send to executor > at > org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162) > at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source) > > > Any idea? Is resuming from checkpoint position on application restart is > actually supported on KinesisIO? > > Regards, > Mani
