We had the exact same problem, but have not spent any time trying to solve it, we just skipped checkpointing for now. When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 2.19.
On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <[email protected]> wrote: > We are on spark 2.4 and Beam 2.22.0 > > > > *From:* Alexey Romanenko <[email protected]> > *Sent:* Wednesday, June 24, 2020 5:15 PM > *To:* [email protected] > *Subject:* Re: KinesisIO checkpointing > > > > *CAUTION:* This email originated from outside of D&B. Please do not click > links or open attachments unless you recognize the sender and know the > content is safe. > > > > Yes, KinesisIO supports restart from checkpoints and it’s based on runner > checkpoints support [1]. > > > > Could you specify which version of Spark and Beam you use? > > > > [1] > https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838 > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7C6e4c3abaf7a4488398ce08d81859bc9c%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286120987612030&sdata=zzUcv9LYIX65yl%2FjV6uMP5C5cZdBgStDVkRL3NXRfm8%3D&reserved=0> > > > > On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <[email protected]> wrote: > > > > Hello, > > > > We are developing a beam pipeline which runs on SparkRunner on streaming > mode. This pipeline read from Kinesis, do some translations, filtering and > finally output to S3 using AvroIO writer. We are using Fixed windows with > triggers based on element count and processing time intervals. Outputs path > is partitioned by window start timestamp. allowedLateness=0sec > > > > This is working fine, but I have noticed that whenever we restarts > streaming, application is starting to read from Kinesis TRIM_HORIZON. That > is, it is not resuming from last checkpoint position. Then I found that the > checkpoint directory is based on --jobName and --checkpointDir properties. > So I tried running as below: > > > > *spark-submit --master yarn --deploy-mode cluster --conf > spark.dynamicAllocation.enabled=false \* > > * --driver-memory 1g --executor-memory 1g --num-executors 1 > --executor-cores 1 \* > > * --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \* > > * --conf spark.executor.extraClassPath=/etc/hbase/conf \* > > * /tmp/stream-processor-0.0.0.8-spark.jar \* > > * --runner=SparkRunner \* > > * --jobName=PrimeStreamProcessor \* > > * --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \* > > * --useWindow=true \* > > * --windowDuration=60s --windowLateness=0s --windowElementCount=1 \* > > * --maxReadTime=-1 \* > > * --streaming=true* > > > > I can see that it is able to fetch checkpoint data from *checkpointDir* path > provided. But When the driver tries to broadcast this information to > executors, it is failing with below exception. > > > > > > > > > *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: > User class threw exception: > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.UnsupportedOperationException: Accumulator must be registered > before send to executor at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71) > at > org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44) > .... at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678) > Caused by: java.lang.UnsupportedOperationException: Accumulator must be > registered before send to executor at > org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162) > at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)* > > > > > > Any idea? Is resuming from checkpoint position on application restart is > actually supported on KinesisIO? > > > > Regards, > > Mani > > >
