Hi Juho,
I tried multi times follow the simple code you privoded, but still can't
reproduce the bug you met. There's one more question I'd like to confirm with
you, is the stateRetentionMillis a fixed(final) field or it might be changed on
some condition?
Best, Sihua
On 05/19/2018 08:19,sihua zhou wrote:
Sorry for the incorrect information, that's not the case.
Best, Sihua
On 05/19/2018 07:58,sihua zhou wrote:
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is
that we are currently at the RC4...
The bug is here.
try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb,
columnFamilyHandle)) {
int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>>
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}
iterator.seek(startKeyGroupPrefixBytes);
while (iterator.isValid()) {
int keyGroup = 0;
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}
if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}
iterator.next();
}
}
for every state handle to get the target data, we
_seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be
INVALID imediately if the state handle's start key group is bigger that
_state.keyGroupRange.getStartKeyGroup()_. Then, data lost
@Stefan, this still need your double check, plz correct me if I'm wrong.
Best, Sihua
On 05/18/2018 17:29,sihua zhou wrote:
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref
summarize.
- have we reached a consensus that the map state losts when restoring?(because
you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from
checkpoint it has no different with storing from savepoint.(@Stefan plz correct
me if I'm wrong)
And @Juho, have you try to rescale the job with a different parallelism(not
always with 16)?
Best, Sihua
On 05/18/2018 17:14,Juho Autio wrote:
Tested with
http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz.
It hits the same problem.
Btw, why is this error logged on INFO level?
2018-05-18 09:03:52,595 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
PlatformIDsProcessFunction -> AppIdFilter([MyApp]) ->
DiscardBeforeDateFunction(null) -> DiscardedEventsFunction ->
(LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500)
-> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched
from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output
watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at
com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO