If I don’t set withInitialPositionInStream(), it will throw a NPE.
Read kinesisReader = KinesisIO.read()
//.withInitialPositionInStream(options.getStreamFromHorizon() ?
InitialPositionInStream.TRIM_HORIZON : InitialPositionInStream.LATEST)
.withStreamName(options.getInputStreamName());
Code above will result NPE as shown below. This is with Spark 2.4.0 and Beam
2.22.0
org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at
org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
at
org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
at
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at
org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:125)
at
org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:63)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at
org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:185)
at
org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
at
org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
at
org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:93)
at
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn.process(BoundedReadFromUnboundedSource.java:165)
From: Mani Kolbe <[email protected]>
Sent: Thursday, July 9, 2020 10:56 PM
To: [email protected]
Subject: Re: KinesisIO checkpointing
CAUTION: This email originated from outside of D&B. Please do not click links
or open attachments unless you recognize the sender and know the content is
safe.
Is it required to set JobName and checkpointDir options for checkpointing to
work?
On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik,
<[email protected]<mailto:[email protected]>> wrote:
The BoundedReadFromUnboundedReader does checkpoint the underlying
UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?
On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe
<[email protected]<mailto:[email protected]>> wrote:
We did the same and started using maxReadTime and put the application to run on
a recurring schedule of 5 minutes. It works fine end to end without any error.
But the problem is that it always starts reading from the beginning of the
Kinesis stream when it stop-starts.
When I did some investigation on that, I found that when you set maxReadTime,
it will run using BoundedReadFromUnboundedSource mode. That essentially
converts source in to a bounded one. This means checkpointing or watermark no
longer supported. Reader just reads for x number of time and exists.
Is there anyway recommended way to resume reading from the position it
finished? Either using maxReadTime or in unboundedSource mode?
Could some point me to a sample pipeline code that uses Kinesis as source?
Regards,
Mani
From: Lars Almgren Schwartz
<[email protected]<mailto:[email protected]>>
Sent: Thursday, June 25, 2020 7:53 AM
To: [email protected]<mailto:[email protected]>
Subject: Re: KinesisIO checkpointing
CAUTION: This email originated from outside of D&B. Please do not click links
or open attachments unless you recognize the sender and know the content is
safe.
We had the exact same problem, but have not spent any time trying to solve it,
we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and
2.19.
On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe
<[email protected]<mailto:[email protected]>> wrote:
We are on spark 2.4 and Beam 2.22.0
From: Alexey Romanenko
<[email protected]<mailto:[email protected]>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: KinesisIO checkpointing
CAUTION: This email originated from outside of D&B. Please do not click links
or open attachments unless you recognize the sender and know the content is
safe.
Yes, KinesisIO supports restart from checkpoints and it’s based on runner
checkpoints support [1].
Could you specify which version of Spark and Beam you use?
[1]
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cce0be33cfe3f4a90c68e08d82452f8e2%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299286070707698&sdata=cyEyW26hP1yQyHx%2Fa1Qomg3SzjB%2FqwbiRmpeEvDam8Q%3D&reserved=0>
On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe
<[email protected]<mailto:[email protected]>> wrote:
Hello,
We are developing a beam pipeline which runs on SparkRunner on streaming mode.
This pipeline read from Kinesis, do some translations, filtering and finally
output to S3 using AvroIO writer. We are using Fixed windows with triggers
based on element count and processing time intervals. Outputs path is
partitioned by window start timestamp. allowedLateness=0sec
This is working fine, but I have noticed that whenever we restarts streaming,
application is starting to read from Kinesis TRIM_HORIZON. That is, it is not
resuming from last checkpoint position. Then I found that the checkpoint
directory is based on --jobName and --checkpointDir properties. So I tried
running as below:
spark-submit --master yarn --deploy-mode cluster --conf
spark.dynamicAllocation.enabled=false \
--driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores
1 \
--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
--conf spark.executor.extraClassPath=/etc/hbase/conf \
/tmp/stream-processor-0.0.0.8-spark.jar \
--runner=SparkRunner \
--jobName=PrimeStreamProcessor \
--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
--useWindow=true \
--windowDuration=60s --windowLateness=0s --windowElementCount=1 \
--maxReadTime=-1 \
--streaming=true
I can see that it is able to fetch checkpoint data from checkpointDir path
provided. But When the driver tries to broadcast this information to executors,
it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User
class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.UnsupportedOperationException: Accumulator must be registered before
send to executor
at
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
at
org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
....
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be
registered before send to executor
at
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
Any idea? Is resuming from checkpoint position on application restart is
actually supported on KinesisIO?
Regards,
Mani