Yes, KinesisIO supports restart from checkpoints and it’s based on runner 
checkpoints support [1].  

Could you specify which version of Spark and Beam you use?

[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
 
<https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838>

> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <[email protected]> wrote:
> 
> Hello,
>  
> We are developing a beam pipeline which runs on SparkRunner on streaming 
> mode. This pipeline read from Kinesis, do some translations, filtering and 
> finally output to S3 using AvroIO writer. We are using Fixed windows with 
> triggers based on element count and processing time intervals. Outputs path 
> is partitioned by window start timestamp. allowedLateness=0sec
>  
> This is working fine, but I have noticed that whenever we restarts streaming, 
> application is starting to read from Kinesis TRIM_HORIZON. That is, it is not 
> resuming from last checkpoint position. Then I found that the checkpoint 
> directory is based on --jobName and --checkpointDir properties. So I tried 
> running as below:
>  
> spark-submit --master yarn --deploy-mode cluster --conf 
> spark.dynamicAllocation.enabled=false \
>     --driver-memory 1g --executor-memory 1g --num-executors 1 
> --executor-cores 1 \
>     --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
>     --conf spark.executor.extraClassPath=/etc/hbase/conf \
>     /tmp/stream-processor-0.0.0.8-spark.jar \
>     --runner=SparkRunner \
>     --jobName=PrimeStreamProcessor \
>     --checkpointDir=hdfs:///tmp/PrimeStreamProcessor 
> <hdfs:///tmp/PrimeStreamProcessor> checkpoint \
>     --useWindow=true \
>     --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
>     --maxReadTime=-1 \
>     --streaming=true
>  
> I can see that it is able to fetch checkpoint data from checkpointDir path 
> provided. But When the driver tries to broadcast this information to 
> executors, it is failing with below exception.
> 20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User 
> class threw exception: 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.UnsupportedOperationException: Accumulator must be registered 
> before send to executor
>         at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>         at 
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>         ....
>         at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be 
> registered before send to executor
>         at 
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>         at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
>  
>  
> Any idea? Is resuming from checkpoint position on application restart is 
> actually supported on KinesisIO?
>  
> Regards,
> Mani

Reply via email to