Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is 
that we are currently at the RC4...
The bug is here.


try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, 
columnFamilyHandle)) {

int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
   byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
      startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}

   iterator.seek(startKeyGroupPrefixBytes);

   while (iterator.isValid()) {

int keyGroup = 0;
      for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
         keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}

if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}

      iterator.next();
}
}


for every state handle to get the target data, we 
_seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be 
INVALID imediately if the state handle's start key group is bigger that 
_state.keyGroupRange.getStartKeyGroup()_. Then, data lost....


@Stefan, this still need your double check, plz correct me if I'm wrong.


Best, Sihua


On 05/18/2018 17:29,sihua zhou<summerle...@163.com> wrote:
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref 
summarize.


- have we reached a consensus that the map state losts when restoring?(because 
you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from 
checkpoint it has no different with storing from savepoint.(@Stefan plz correct 
me if I'm wrong)


And @Juho, have you try to rescale the job with a different parallelism(not 
always with 16)?


Best, Sihua








On 05/18/2018 17:14,Juho Autio<juho.au...@rovio.com> wrote:
Tested with 
http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz.


It hits the same problem.


Btw, why is this error logged on INFO level?



2018-05-18 09:03:52,595 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> 
DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> 
(LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) 
-> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched 
from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output 
watermark: 
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at 
com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job 
FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to 
FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output 
watermark: 
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at 
com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more


On Fri, May 18, 2018 at 11:06 AM, Juho Autio <juho.au...@rovio.com> wrote:

Thanks Sihua, I'll give that RC a try.


On Fri, May 18, 2018 at 10:58 AM, sihua zhou <summerle...@163.com> wrote:

Hi Juho,
would you like to try out the latest 
RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job 
from the "problematic" checkpoint? The latest RC includes a fix for the 
potential silently data lost. If it's the reason, you will see a different 
exception when you trying to recover you job.


Best, Sihua






On 05/18/2018 15:02,Juho Autio<juho.au...@rovio.com> wrote:
I see. I appreciate keeping this option available even if it's "beta". The 
current situation could be documented better, though.


As long as rescaling from checkpoint is not officially supported, I would put 
it behind a flag similar to --allowNonRestoredState. The flag could be called 
--allowRescalingRestoredCheckpointState, for example. This would make sure that 
users are aware that what they're using is experimental and might have 
unexpected effects.


As for the bug I faced, indeed I was able to reproduce it consistently. And I 
have provided TRACE-level logs personally to Stefan. If there is no Jira ticket 
for this yet, would you like me to create one?


On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s.rich...@data-artisans.com> 
wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my 
> job? Based on my experience it seems like rescaling sometimes works, but then 
> you can have these random errors.

If there is a problem, I would still consider it a bug because it should work 
correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if 
> it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a 
"hidden feature“ until it got some more exposure and also some questions about 
the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that 
> cancelling with a savepoint times out & fails. Incremental checkpoints are 
> still working because they don't timeout as long as the state is growing 
> linearly. In that case if we want to scale up (for example to enable 
> successful savepoint creation ;) ), the only thing we can do is to restore 
> from the latest checkpoint. But then we have no way to scale up by increasing 
> the cluster size, because we can't create a savepoint with a smaller cluster 
> but on the other hand can't restore a checkpoint to a bigger cluster, if 
> rescaling from a checkpoint is not supposed to be relied on. So in this case 
> we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature 
in the near future.

Best,
Stefan







Reply via email to