Hello,
We are developing a beam pipeline which runs on SparkRunner on streaming mode.
This pipeline read from Kinesis, do some translations, filtering and finally
output to S3 using AvroIO writer. We are using Fixed windows with triggers
based on element count and processing time intervals. Outputs path is
partitioned by window start timestamp. allowedLateness=0sec
This is working fine, but I have noticed that whenever we restarts streaming,
application is starting to read from Kinesis TRIM_HORIZON. That is, it is not
resuming from last checkpoint position. Then I found that the checkpoint
directory is based on --jobName and --checkpointDir properties. So I tried
running as below:
spark-submit --master yarn --deploy-mode cluster --conf
spark.dynamicAllocation.enabled=false \
--driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores
1 \
--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
--conf spark.executor.extraClassPath=/etc/hbase/conf \
/tmp/stream-processor-0.0.0.8-spark.jar \
--runner=SparkRunner \
--jobName=PrimeStreamProcessor \
--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
--useWindow=true \
--windowDuration=60s --windowLateness=0s --windowElementCount=1 \
--maxReadTime=-1 \
--streaming=true
I can see that it is able to fetch checkpoint data from checkpointDir path
provided. But When the driver tries to broadcast this information to executors,
it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User
class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.UnsupportedOperationException: Accumulator must be registered before
send to executor
at
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
at
org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
....
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be
registered before send to executor
at
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
Any idea? Is resuming from checkpoint position on application restart is
actually supported on KinesisIO?
Regards,
Mani