When I restart my streaming program??this bug found And it will kill my
program
I am using spark 1.4.1
15/11/22 03:20:00 WARN CheckpointWriter: Error in attempt 1 of writing
checkpoint to hdfs://streaming/user/dm/order_predict/streaming_
v2/10/checkpoint/checkpoint-144813360
at following intervals...
>
>- 09:45
>- 10:00
>- 10:15
>- 10:30
>- and so on
>
> When my job is restarted, and recovers from the checkpoint it does the
> re-partitioning step twice for each 15 minute job until the window of 2
> hours is complete. The
restarted, and recovers from the checkpoint it does the
re-partitioning step twice for each 15 minute job until the window of 2
hours is complete. Then the re-partitioning takes place only once.
For example - when the job recovers at 16:15 it does re-partitioning for
the 16:15 Kafka stream and the
information it last saved in the checkpoint (saved into
HDFS) and order to new executors (since the previous driver died so did the
executors) to continue consuming data. Since your executors are using the
receivers approach (as opposed to the directkafkastream) with WAL what will
happen is that when
if I already have checkpointed the previous rdds or I
am missing something ?
Also, do I need to checkpoint kafkaStreamRdd and advDataObjectsRdd when I
am already checkpointing advDashboardAggKeyVsMetricRdd.
Please let me know if any other information is required. I am using spark
1.4.0
; (x._1, DateTime.now,
formatterFunc(x._2))).saveToCassandra(keyspace, hourlyStatsTable)
HourlyResult.map(x => (x, "hourly")).print()
}
}
On Wed, Nov 4, 2015 at 12:27 PM, vimal dinakaran
wrote:
> I have a simple spark streaming application which reads the data
Hmm,
Seems it just do a trick.
Using this method, it's very hard to recovery from failure, since we don't
know which batch have been done.
I really want to maintain the whole running stats in memory to archive full
failure-tolerant.
I just wonder if the performance of data checkpoi
Key function to maintain the
> running state (which requires checkpoint), and my bottleneck is now in data
> checkpoint.
>
> My pseudo code is like below:
>
> JavaStreamingContext jssc = new JavaStreamingContext(
> sparkConf,Durations.seconds(2));
>
Thanks Aniket,
I want to store the state to an external storage but it should be in later
step I think.
Basically, I have to use updateStateByKey function to maintain the running
state (which requires checkpoint), and my bottleneck is now in data
checkpoint.
My pseudo code is like below
Can you try storing the state (word count) in an external key value store?
On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê wrote:
> Hi all,
>
> Anyone could help me on this. It's a bit urgent for me on this.
> I'm very confused and curious about Spark data checkpoint perfo
Hi all,
Anyone could help me on this. It's a bit urgent for me on this.
I'm very confused and curious about Spark data checkpoint performance? Is
there any detail implementation of checkpoint I can look into?
Spark Streaming only take sub-second to process 20K messages/sec, however
All,
I have a streaming application that monitors a HDFS folder and compute some
metrics based on this data, the data in this folder will be updated by
another uploaded application.
The streaming application's batch interval is 1 minute, batch processing
time of streaming is about 30 seconds, its
Thankd all, it would be great to have this feature soon.
Do you know what's the release plan for 1.6?
In addition to this, I still have checkpoint performance problem
My code is just simple like this:
JavaStreamingContext jssc = new
JavaStreamingContext(sparkConf,Durations.seco
Spark Streaming data checkpoint performance
"trackStateByKey" is about to be added in 1.6 to resolve the performance issue
of "updateStateByKey". You can take a look at
https://issues.apache.org/jira/browse/SPARK-2629 and
https://github.com/apache/spark/pull/9256
I have a simple spark streaming application which reads the data from the
rabbitMQ
and does some aggregation on window interval of 1 min and 1 hour for
batch interval of 30s.
I have a three node setup. And to enable checkpoint,
I have mounted the same directory using sshfs to all worker node
"trackStateByKey" is about to be added in 1.6 to resolve the performance
issue of "updateStateByKey". You can take a look at
https://issues.apache.org/jira/browse/SPARK-2629 and
https://github.com/apache/spark/pull/9256
erval as your suggestion,
however is there any other Spark configuration to turning the data
checkpoint performance?
And just curious, technically why updateStateByKey need to be called for
very key (regardless the new occurrences or not)? Does Spark has any plan
to fix it?
I have 4M keys ne
failed recovery from checkpoint
Hi guys,
I’ve encountered some problems with a crashed Spark Streaming job, when
restoring from checkpoint.
I’m runnning spark 1.5.1 on Yarn (hadoop 2.6) in cluster mode, reading from
Kafka with the direct consumer and a few updateStateByKey stateful
transfo
You are correct, the default checkpointing interval is 10 seconds or your batch
size, whichever is bigger. You can change it by calling .checkpoint(x) on your
resulting Dstream.
For the rest, you are probably keeping an “all time” word count that grows
unbounded if you never remove words from
Hi Spark guru
I am evaluating Spark Streaming,
In my application I need to maintain cumulative statistics (e.g the total
running word count), so I need to call the updateStateByKey function on
very micro-batch.
After setting those things, I got following behaviors:
* The Processing Time
I have a Spark Streaming job that runs great the first time around (Elastic
MapReduce 4.1.0), but when recovering from a checkpoint in S3, the job runs
but Spark itself seems to be jacked-up in lots of little ways:
- Executors, which are normally stable for days, are terminated within a
ould a
> restarted driver continue? I run a simple experiment as follows:
>
> 1. In the first driver run, Spark driver processes 1 million records
> starting from InitialPositionInStream.TRIM_HORIZON in 5 second batch
> intervals with 10 seconds set as the Kinesis receiver checkpoint interval.
InitialPositionInStream.TRIM_HORIZON in 5 second batch
intervals with 10 seconds set as the Kinesis receiver checkpoint interval.
(This interval has been purposely set low to see the impact of where a
restarted driver would pick up. )
2. We stop pushing events to Kinesis stream until the driver keeps
So as long as jar is kept on s3 and available across different runs, then the
s3 checkpoint is working.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068p25081.html
Sent from the Apache Spark User List mailing
It looks like that reconstruction of SparkContext from checkpoint data is
trying to look for
the jar file of previous failed runs. It can not find the jar files as our
jar files are on local
machines and were cleaned up after each failed run.
--
View this message in context:
http
Hi, I am trying to set spark streaming checkpoint to s3, here is what I did
basically
val checkpoint = "s3://myBucket/checkpoint"
val ssc = StreamingContext.getOrCreate(checkpointDir,
() =>
getStreamingContext
at 2:18 PM, Tathagata Das wrote:
>
>> Can you provide the before stop and after restart log4j logs for this?
>>
>> On Fri, Oct 9, 2015 at 2:13 PM, Spark Newbie
>> wrote:
>>
>>> Hi Spark Users,
>>>
>>> I'm seeing checkpoint restore
logs? I
can send it if that will help dig into the root cause.
On Fri, Oct 9, 2015 at 2:18 PM, Tathagata Das wrote:
> Can you provide the before stop and after restart log4j logs for this?
>
> On Fri, Oct 9, 2015 at 2:13 PM, Spark Newbie
> wrote:
>
>> Hi Spark Users,
&
Can you provide the before stop and after restart log4j logs for this?
On Fri, Oct 9, 2015 at 2:13 PM, Spark Newbie
wrote:
> Hi Spark Users,
>
> I'm seeing checkpoint restore failures causing the application startup to
> fail with the below exception. When I do "ls" o
Hi Spark Users,
I'm seeing checkpoint restore failures causing the application startup to
fail with the below exception. When I do "ls" on the s3 path I see the key
listed sometimes and not listed sometimes. There are no part files
(checkpointed files) in the specified S3 path. T
val parquetDataFrame =
sqlContext.read.parquet(parquetFilename.getAbsolutePath)
parquetDataFrame.foreachPartition {
rowIterator =>
rowIterator.foreach { row =>
// ... do work
}
}
My use case is quite simple: I would like to save a checkpoint during
processing, and if the driver pr
;>
>>> My Stream job is throwing below exception at every interval. It is first
>>> deleting the the checkpoint file and then it's trying to checkpoint, is
>>> this normal behaviour? I'm using Spark 1.3.0. Do you know what may cause
>>> this issue?
>&
it is now because I plan to write my own recovery at some
> point.
>
> Petr
>
> On Wed, Sep 23, 2015 at 4:26 PM, Cody Koeninger
> wrote:
>
>> TD can correct me on this, but I believe checkpointing is done after a
>> set of jobs is submitted, not after they are completed
ull?
>
> On Thu, Sep 24, 2015 at 5:02 PM, Uthayan Suthakar <
> uthayan.sutha...@gmail.com> wrote:
>
>> Hello all,
>>
>> My Stream job is throwing below exception at every interval. It is first
>> deleting the the checkpoint file and then it's trying to checkp
Are you by any chance setting DStream.remember() with null?
On Thu, Sep 24, 2015 at 5:02 PM, Uthayan Suthakar <
uthayan.sutha...@gmail.com> wrote:
> Hello all,
>
> My Stream job is throwing below exception at every interval. It is first
> deleting the the checkpoint file and t
Hello all,
My Stream job is throwing below exception at every interval. It is first
deleting the the checkpoint file and then it's trying to checkpoint, is
this normal behaviour? I'm using Spark 1.3.0. Do you know what may cause
this issue?
15/09/24 16:35:55 INFO scheduler.Task
TD can correct me on this, but I believe checkpointing is done after a set
of jobs is submitted, not after they are completed. If you fail while
processing the jobs, starting over from that checkpoint should put you in
the correct state.
In any case, are you actually observing a loss of messages
to save files:
stream.foreachRDD { (rdd, time) =>
...
rdd.toDF().write.save(...use time for the directory name...)
}
It is not idempotent at the moment but let's put this aside for now.
The strange thing is that when I Ctrl+C the job I can see checkpoint file
with timestamp for the la
> checkpointing writes rdd to disk, so this checkpointing happens on all
> workers. Whenever, spark has to read back the rdd , checkpoint directory
> should be reachable to all the workers and should be a common place where
> workers can write to and read from. This asks for commonly access
@Das,
No, i am getting in the cluster mode.
I think i understood why i am getting this error, please correct me if i am
wrong.
Reason is:
checkpointing writes rdd to disk, so this checkpointing happens on all
workers. Whenever, spark has to read back the rdd , checkpoint directory
should be
Are you getting this error in local mode?
On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi
wrote:
> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
> don't use reduceByKeyAndWindow.
>
> When i start using "reduceByKeyAndWindow
Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
don't use reduceByKeyAndWindow.
When i start using "reduceByKeyAndWindow" it complains me with the error
"Exception in thread "main" org.apache.spark.SparkException: Invalid
checkpoint
Have you tried simply ssc.checkpoint("checkpoint”)? This should create it in
the local folder, has always worked for me when in development on local mode.
For the others (/tmp/..) make sure you have rights to write there.
-adrian
From: srungarapu vamsi
Date: Tuesday, September 22, 2015 at
that i need not setup hdfs
but the checkpoint directory should be HDFS copatible.
I am a beginner in this area. I am running my spark streaming application
on ubuntu 14.04, spark -1.3.1
If at all i need not setup hdfs and ext4 is hdfs compatible, then how does
my checkpoint directory look like?
i
We have tried on another cluster installation with the same effect.
Petr
On Mon, Sep 21, 2015 at 10:45 AM, Petr Novak wrote:
> It might be connected with my problems with gracefulShutdown in Spark
>> 1.5.0 2.11
>> https://mail.google.com/mail/#search/petr/14fb6bd5166f9395
>>
>> Maybe Ctrl+C cor
>
> It might be connected with my problems with gracefulShutdown in Spark
> 1.5.0 2.11
> https://mail.google.com/mail/#search/petr/14fb6bd5166f9395
>
> Maybe Ctrl+C corrupts checkpoints and breaks gracefulShutdown?
>
The provided link is obviously wrong. I haven't found it Spark mailing
lists archi
Hi Michal,
yes, it is there logged twice, it can be seen in attached log in one of
previous post with more details:
15/09/17 23:06:37 INFO StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook
15/09/17 23:06:37 INFO StreamingContext: Invoking
stop(stopGracefully=false) from shut
Hi Petr, after Ctrl+C can you see the following message in the logs?
Invoking stop(stopGracefully=false)
Details:
https://github.com/apache/spark/pull/6307
On 18 September 2015 at 10:28, Petr Novak wrote:
> It might be connected with my problems with gracefulShutdown in Spark
> 1.5.0 2.11
> h
Hi Swetha,
The problem of stack overflow is that when recovering from checkpoint data,
Java will use a recursive way to deserialize the call stack, if you have a
large call stack, this recursive way can easily lead to stack overflow.
This is caused by Java deserialization mechanism, you need to
Which version of Java are you using ?
And release of Spark, please.
Thanks
On Fri, Sep 18, 2015 at 9:15 AM, swetha wrote:
> Hi,
>
> When I try to recover my Spark Streaming job from a checkpoint directory, I
> get a StackOverFlow Error as shown below. Any idea as to why this is
Hi,
When I try to recover my Spark Streaming job from a checkpoint directory, I
get a StackOverFlow Error as shown below. Any idea as to why this is
happening?
15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the
context, marking it as stopped
java.lang.StackOverflowError
It might be connected with my problems with gracefulShutdown in Spark 1.5.0
2.11
https://mail.google.com/mail/#search/petr/14fb6bd5166f9395
Maybe Ctrl+C corrupts checkpoints and breaks gracefulShutdown?
Petr
On Fri, Sep 18, 2015 at 4:10 PM, Petr Novak wrote:
> ...to ensure it is not something
...to ensure it is not something wrong on my cluster.
On Fri, Sep 18, 2015 at 4:09 PM, Petr Novak wrote:
> I have tried it on Spark 1.3.0 2.10 and it works. The same code doesn't on
> Spark 1.5.0 2.11. It would be nice if anybody could try on another
> installation to ensure it is something wron
I have tried it on Spark 1.3.0 2.10 and it works. The same code doesn't on
Spark 1.5.0 2.11. It would be nice if anybody could try on another
installation to ensure it is something wrong on my cluster.
Many thanks,
Petr
On Fri, Sep 18, 2015 at 4:07 PM, Petr Novak wrote:
> This one is generated,
This one is generated, I suppose, after Ctrl+C
15/09/18 14:38:25 INFO Worker: Asked to kill executor
app-20150918143823-0001/0
15/09/18 14:38:25 INFO Worker: Asked to kill executor
app-20150918143823-0001/0
15/09/18 14:38:25 DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handle
Also, giving more log4j messages around the error area would be useful.
On Thu, Sep 17, 2015 at 1:02 PM, Cody Koeninger wrote:
> Is there a particular reason you're calling checkpoint on the stream in
> addition to the streaming context?
>
> On Thu, Sep 17, 2015 at 2:36 PM,
Is there a particular reason you're calling checkpoint on the stream in
addition to the streaming context?
On Thu, Sep 17, 2015 at 2:36 PM, Petr Novak wrote:
> Hi all,
> it throws FileBasedWriteAheadLogReader: Error reading next item, EOF
> reached
> java.io.EO
; -> "smallest")
val topics = Set("topic-p03-r01")
val stream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream
.checkpoint(Seconds(60))
.foreachRDD { (rdd, time) =>
rdd.t
ted after 4 am” NOT “events
> received
> after 4 am”
>2. Graceful shutdown and saving data to DB, followed by checkpoint
>cleanup / new checkpoint dir
> 1. On restat, you need to use the updateStateByKey that takes an
> initialRdd with the values preloaded fro
that point forward
* Note that “up to a point” is specific to you state management logic,
it might mean “user sessions stated after 4 am” NOT “events received after 4 am”
2. Graceful shutdown and saving data to DB, followed by checkpoint cleanup /
new checkpoint dir
* On restat, you
In my understand, here I have only three options to keep the DStream state
between redeploys (yes, I'm using updateStateByKey):
1. Use checkpoint.
2. Use my own database.
3. Use both.
But none of these options are great:
1. Use checkpoint: I cannot load it after code change. Or I need to
Any kind of changes to the jvm classes will make it fail. By checkpointing
the data you mean using checkpoint with updateStateByKey? Here's a similar
discussion happened earlier which will clear your doubts i guess
http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA
And here is another question. If I load the DStream from database every
time I start the job, will the data be loaded when the job is failed and
auto restart? If so, both the checkpoint data and database data are loaded,
won't this a problem?
Bin Wang 于2015年9月16日周三 下午8:40写道:
&
Will StreamingContex.getOrCreate do this work?What kind of code change will
make it cannot load?
Akhil Das 于2015年9月16日周三 20:20写道:
> You can't really recover from checkpoint if you alter the code. A better
> approach would be to use some sort of external storage (like a db or
> zoo
You can't really recover from checkpoint if you alter the code. A better
approach would be to use some sort of external storage (like a db or
zookeeper etc) to keep the state (the indexes etc) and then when you deploy
new code they can be easily recovered.
Thanks
Best Regards
On Wed, S
I'd like to know if there is a way to recovery dstream from checkpoint.
Because I stores state in DStream, I'd like the state to be recovered when
I restart the application and deploy new code.
change those. Upon
> further reading, even though StreamingContext.getOrCreate makes an entirely
> new spark conf, Checkpoint will only reload certain properties.
>
> I'm not sure if it'd be safe to include memory / cores among those
> properties that get re-loaded, TD would
Yeah, looks like you're right about being unable to change those. Upon
further reading, even though StreamingContext.getOrCreate makes an entirely
new spark conf, Checkpoint will only reload certain properties.
I'm not sure if it'd be safe to include memory / cores among those
Hi Cody,
Thanks for your answer.
I had already tried to change the spark submit parameters, but I double
checked to reply your answer. Even changing properties file or directly on
the spark-submit arguments, none of them work when the application runs
from the checkpoint. It seems that
Yeah, it makes sense that parameters that are read only during your
getOrCCreate function wouldn't be re-read, since that function isn't called
if a checkpoint is loaded.
I would have thought changing number of executors and other things used by
spark-submit would work on checkpoi
Hi guys,
I tried to use the configuration file, but it didn't work as I expected. As
part of the Spark Streaming flow, my methods run only when the application
is started the first time. Once I restart the app, it reads from the
checkpoint and all the dstream operations come from the cach
Probably, the problem here is that the recovered StreamingContext is trying
to refer to the pre-failure static RDD, which does exist after the failure.
The solution: When the driver process restarts from checkpoint, you need to
recreate the static RDD again explicitly, and make that the recreated
m a new StreamContext.
> But if the StreamContext is restored from checkpoint, there will be an
> exception as followed and the Graph can not be setup.
> Do you know how to solve this problem? Thanks very much!
>
> 5/09/07 14:07:18 INFO spark.SparkContext: Starting job: saveA
orks well when it start from a new StreamContext.
But if the StreamContext is restored from checkpoint, there will be an
exception as followed and the Graph can not be setup.
Do you know how to solve this problem? Thanks very much!
5/09/07 14:07:18 INFO spark.SparkContext: Starting job: saveAsTex
Good tip. I will try that.
Thank you.
On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger wrote:
> Yeah, in general if you're changing the jar you can't recover the
> checkpoint.
>
> If you're just changing parameters, why not externalize those in a
> configuration f
Yeah, in general if you're changing the jar you can't recover the
checkpoint.
If you're just changing parameters, why not externalize those in a
configuration file so your jar doesn't change? I tend to stick even my
app-specific parameters in an external spark config so
Hi,
Is there a way to submit an app code change, keeping the checkpoint data or
do I need to erase the checkpoint folder every time I re-submit the spark
app with a new jar?
I have an app that count pageviews streaming from Kafka, and deliver a file
every hour from the past 24 hours. I'm
n I want to be able to
> retrieve the lastest kafka offsets that were processed by the pipeline, and
> create my kafka direct streams from those offsets. Because the checkpoint
> directory isn't guaranteed to be compatible between job deployments, I
> don't want to re-use the
When deploying a spark streaming application I want to be able to retrieve
the lastest kafka offsets that were processed by the pipeline, and create
my kafka direct streams from those offsets. Because the checkpoint
directory isn't guaranteed to be compatible between job deployments, I
don
assertion failed: The
> checkpoint directory has not been set. Please use
> StreamingContext.checkpoint() or SparkContext.checkpoint() to set the
> checkpoint directory.
> at scala.Predef$.assert(Predef.scala:179)
> at
> org.apache.spark.streaming.dstream.DStream.
I am running in yarn-client mode and trying to execute network word count
example. When I connect through nc I see the following in spark app logs:
Exception in thread "main" java.lang.AssertionError: assertion failed: The
checkpoint directory has not been set.
(notFilter).checkpoint(interval)
toNotUpdate.foreachRDD(rdd =>
pending = rdd
)
Thanks
On 3 August 2015 at 13:09, Tathagata Das wrote:
> Can you tell us more about streaming app? DStream operation that you are
> using?
>
> On Sun, Aug 2, 2015 at 9:14 PM, Anand Nalya wrote:
>
Can you tell us more about streaming app? DStream operation that you are
using?
On Sun, Aug 2, 2015 at 9:14 PM, Anand Nalya wrote:
> Hi,
>
> I'm writing a Streaming application in Spark 1.3. After running for some
> time, I'm getting following execption. I'm sure, that no other process is
> modi
Hi,
I'm writing a Streaming application in Spark 1.3. After running for some
time, I'm getting following execption. I'm sure, that no other process is
modifying the hdfs file. Any idea, what might be the cause of this?
15/08/02 21:24:13 ERROR scheduler.DAGSchedulerEventProcessLoop:
DAGSchedulerEv
t means its
probably running the first time. Otherwise it is continuing, and so read
offset and continue from there.
On Wed, Jul 29, 2015 at 9:38 PM, Shushant Arora
wrote:
> 1.How to do it in java?
> 2.Can broadcast objects also be created in same way after checkpointing.
> 3.Is it saf
1.How to do it in java?
2.Can broadcast objects also be created in same way after checkpointing.
3.Is it safe If I disable checkpoint and write offsets at end of each batch
to hdfs in mycode and somehow specify in my job to use this offset for
creating kafkastream at first time. How can I
Rather than using accumulator directly, what you can do is something like
this to lazily create an accumulator and use it (will get lazily recreated
if driver restarts from checkpoint)
dstream.transform { rdd =>
val accum = SingletonObject.getOrCreateAccumulator() // single object
met
Hi
I am using spark streaming 1.3 and using checkpointing.
But job is failing to recover from checkpoint on restart.
For broadcast variable it says :
1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
java.lang.ClassCastException: [B cannot be cast to
That seems abnormally large for a checkpoint. How many topicpartitions do
you have, what kind of batch size do you have, do you have any windowing
operations, and how long had the job been down for?
generatedRDDs holds a map of time => RDD, and that is used as the basis for
what to store i
Hello,
I'm using 4Gb for the driver memory. The checkpoint is between 1 Gb and 10
Gb depending if I'm reprocessing all the data from beginning or just
getting the latest offset from the real time processed. Is there any best
practice to be aware of with driver memory relating to check
That stacktrace looks like an out of heap space on the driver while writing
checkpoint, not on the worker nodes. How much memory are you giving the
driver? How big are your stored checkpoints?
On Tue, Jul 28, 2015 at 9:30 AM, Nicolas Phung
wrote:
> Hi,
>
> Af
nvoke(DelegatingMethodAccessorImpl.java:43)
I don't know why, after that, it's eating all the CPU on one of the node
till the entire job stopped. It tries to resume from checkpoint several
times but failed with this error too. I think I have enough spared memory
with 4 nodes with 24 Gb pe
Hi all.
I am writing a twitter connector using spark streaming. i have written the
following code to maintain checkpoint.
val
ssc=StreamingContext.getOrCreate("hdfs://192.168.23.109:9000/home/cloud9/twitterCheckpoint",()=>
{ managingContext() })
def managingContext():Str
Hi,
I have following code that uses checkpoint to checkpoint the heavy ops,which
works well that the last heavyOpRDD.foreach(println) will not recompute from
the beginning.
But when I re-run this program, the rdd computing chain will be recomputed from
the beginning, I thought that it will
gt;>>> only 1Mb message in my Kafka topic. If you have any others informations
>>>>>>> or
>>>>>>> suggestions, please tell me.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Nicolas PHUNG
>>>&
gt;>>>> Not exactly the same issue, but possibly related:
>>>>>>>
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-1196
>>>>>>>
>>>>>>> On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger >>>>
the stack trace...
>>>>>>>
>>>>>>> at java.nio.Buffer.limit(Buffer.java:275)
>>>>>>>
>>>>>>> That exception gets thrown if the limit is negative or greater than the
>>>>>>> buffer's capacity
>>&g
er than the
>>>>>> buffer's capacity
>>>>>>
>>>>>>
>>>>>> at kafka.message.Message.sliceDelimited(Message.scala:236)
>>>>>>
>>>>>> If size had been negative, it would have just returned nul
>> buffer's capacity
>>>>>
>>>>>
>>>>> I haven't seen that before... maybe a corrupted message of some kind?
>>>>>
>>>>> If that problem is reproducible, try providing an explicit argument
>>>>>
.
>>>>
>>>>
>>>> On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <
>>>> nicolas.ph...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> When I'm reprocessing the data from kafka (about 4
201 - 300 of 405 matches
Mail list logo