We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko <[email protected]>
Sent: Wednesday, June 24, 2020 5:15 PM
To: [email protected]
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner 
checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7C6e4c3abaf7a4488398ce08d81859bc9c%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286120987612030&sdata=zzUcv9LYIX65yl%2FjV6uMP5C5cZdBgStDVkRL3NXRfm8%3D&reserved=0>


On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe 
<[email protected]<mailto:[email protected]>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. 
This pipeline read from Kinesis, do some translations, filtering and finally 
output to S3 using AvroIO writer. We are using Fixed windows with triggers 
based on element count and processing time intervals. Outputs path is 
partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, 
application is starting to read from Kinesis TRIM_HORIZON. That is, it is not 
resuming from last checkpoint position. Then I found that the checkpoint 
directory is based on --jobName and --checkpointDir properties. So I tried 
running as below:

spark-submit --master yarn --deploy-mode cluster --conf 
spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 
1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner \
    --jobName=PrimeStreamProcessor \
    
--checkpointDir=hdfs:///tmp/PrimeStreamProcessor<hdfs://tmp/PrimeStreamProcessor>
 checkpoint \
    --useWindow=true \
    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path 
provided. But When the driver tries to broadcast this information to executors, 
it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User 
class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.UnsupportedOperationException: Accumulator must be registered before 
send to executor
        at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be 
registered before send to executor
        at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is 
actually supported on KinesisIO?

Regards,
Mani

Reply via email to