Tested with
http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz
.

It hits the same problem.

Btw, why is this error logged on INFO level?

2018-05-18 09:03:52,595 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
PlatformIDsProcessFunction -> AppIdFilter([MyApp]) ->
DiscardBeforeDateFunction(null) -> DiscardedEventsFunction ->
(LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway
(capacity=500) -> Sink: ResponseKafkaSink) (8/16)
(c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at
com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state
RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at
com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

On Fri, May 18, 2018 at 11:06 AM, Juho Autio <juho.au...@rovio.com> wrote:

> Thanks Sihua, I'll give that RC a try.
>
> On Fri, May 18, 2018 at 10:58 AM, sihua zhou <summerle...@163.com> wrote:
>
>> Hi Juho,
>> would you like to try out the latest RC(http://people.apache.org/~t
>> rohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic"
>> checkpoint? The latest RC includes a fix for the potential silently data
>> lost. If it's the reason, you will see a different exception when you
>> trying to recover you job.
>>
>> Best, Sihua
>>
>>
>>
>> On 05/18/2018 15:02,Juho Autio<juho.au...@rovio.com>
>> <juho.au...@rovio.com> wrote:
>>
>> I see. I appreciate keeping this option available even if it's "beta".
>> The current situation could be documented better, though.
>>
>> As long as rescaling from checkpoint is not officially supported, I would
>> put it behind a flag similar to --allowNonRestoredState. The flag could be
>> called --allowRescalingRestoredCheckpointState, for example. This would
>> make sure that users are aware that what they're using is experimental and
>> might have unexpected effects.
>>
>> As for the bug I faced, indeed I was able to reproduce it consistently.
>> And I have provided TRACE-level logs personally to Stefan. If there is no
>> Jira ticket for this yet, would you like me to create one?
>>
>> On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> >
>>> > This raises a couple of questions:
>>> > - Is it a bug though, that the state restoring goes wrong like it does
>>> for my job? Based on my experience it seems like rescaling sometimes works,
>>> but then you can have these random errors.
>>>
>>> If there is a problem, I would still consider it a bug because it should
>>> work correctly.
>>>
>>> > - If it's not supported properly, why not refuse to restore a
>>> checkpoint if it would require rescaling?
>>>
>>> It should work properly, but I would preferred to keep this at the level
>>> of a "hidden feature“ until it got some more exposure and also some
>>> questions about the future of differences between savepoints and
>>> checkpoints are solved.
>>>
>>> > - We have sometimes had Flink jobs where the state has become so heavy
>>> that cancelling with a savepoint times out & fails. Incremental checkpoints
>>> are still working because they don't timeout as long as the state is
>>> growing linearly. In that case if we want to scale up (for example to
>>> enable successful savepoint creation ;) ), the only thing we can do is to
>>> restore from the latest checkpoint. But then we have no way to scale up by
>>> increasing the cluster size, because we can't create a savepoint with a
>>> smaller cluster but on the other hand can't restore a checkpoint to a
>>> bigger cluster, if rescaling from a checkpoint is not supposed to be relied
>>> on. So in this case we're stuck and forced to start from an empty state?
>>>
>>> IMO there is a very good chance that this will simply become a normal
>>> feature in the near future.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>
>

Reply via email to