Hi all,

I have an Apache BEAM pipeline failing to restart on the Flink Runner against a 
checkpoint. The exception is showing an EOF Exception related to the 
InstantCoder. This is version 2.20 of BEAM on 1.8 Flink. See stack trace below.

This seems to be the same as the issue raised (against DataFlow) on Stack 
Overflow here: 
https://stackoverflow.com/questions/52008247/pipeline-fails-with-invalid-namespace-string.
 Both are showing an issue with the InstantCoder.

I'm not sure if it could be related to the bug 
https://issues.apache.org/jira/browse/BEAM-2831 as suggested in the Stack 
Overflow issue.

Any suggestions?

Thanks,

Steve



java.lang.Exception: Exception while creating StreamOperatorStateContext.
                    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
                    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
                    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
                    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
                    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
                    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowDoFnOperator_bb5f78e61d64604361a7c2d88da76203_(2/2) from any 
of the 1 provided restore options.
                    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
                    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
                    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
                    ... 5 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
                    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:130)
                    at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:489)
                    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
                    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
                    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
                    ... 7 more
Caused by: java.lang.RuntimeException: Invalid namespace string: '//'
                    at 
org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:270)
                    at 
org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246)
                    at 
org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221)
                    at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
                    at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:151)
                    at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
                    at 
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
                    at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
                    at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290)
                    at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251)
                    at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
                    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)
                    ... 11 more
Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
                    at 
org.apache.beam.sdk.coders.InstantCoder.decode(InstantCoder.java:70)
                    at 
org.apache.beam.sdk.coders.InstantCoder.decode(InstantCoder.java:34)
                    at 
org.apache.beam.sdk.transforms.windowing.IntervalWindow$IntervalWindowCoder.decode(IntervalWindow.java:160)
                    at 
org.apache.beam.sdk.transforms.windowing.IntervalWindow$IntervalWindowCoder.decode(IntervalWindow.java:140)
                    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
                    at 
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
                    at 
org.apache.beam.sdk.util.CoderUtils.decodeFromBase64(CoderUtils.java:157)
                    at 
org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:262)
                    ... 22 more
Caused by: java.io.EOFException
                    at 
java.io.DataInputStream.readFully(DataInputStream.java:197)
                    at 
java.io.DataInputStream.readLong(DataInputStream.java:416)
                    at 
org.apache.beam.sdk.coders.InstantCoder.decode(InstantCoder.java:66)
                    ... 29 more


Stephen Hesketh | Client Analytics Technology
* +44 (0)7968 039848
* [email protected]
250 Bishopsgate | London | EC2M 4AA
The information classification of this email is Confidential unless otherwise 
stated.



This communication and any attachments are confidential and intended solely for 
the addressee. If you are not the intended recipient please advise us 
immediately and delete it. Unless specifically stated in the message or 
otherwise indicated, you may not duplicate, redistribute or forward this 
message and any attachments are not intended for distribution to, or use by any 
person or entity in any jurisdiction or country where such distribution or use 
would be contrary to local law or regulation. NatWest Markets Plc  or any 
affiliated entity ("NatWest Markets") accepts no responsibility for any changes 
made to this message after it was sent.

Unless otherwise specifically indicated, the contents of this communication and 
its attachments are for information purposes only and should not be regarded as 
an offer or solicitation to buy or sell a product or service, confirmation of 
any transaction, a valuation, indicative price or an official statement. 
Trading desks may have a position or interest that is inconsistent with any 
views expressed in this message. In evaluating the information contained in 
this message, you should know that it could have been previously provided to 
other clients and/or internal NatWest Markets personnel, who could have already 
acted on it.

NatWest Markets cannot provide absolute assurances that all electronic 
communications (sent or received) are secure, error free, not corrupted, 
incomplete or virus free and/or that they will not be lost, mis-delivered, 
destroyed, delayed or intercepted/decrypted by others. Therefore NatWest 
Markets disclaims all liability with regards to electronic communications (and 
the contents therein) if they are corrupted, lost destroyed, delayed, 
incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated 
by others.

Any electronic communication that is conducted within or through NatWest 
Markets systems will be subject to being archived, monitored and produced to 
regulators and in litigation in accordance with NatWest Markets’ policy and 
local laws, rules and regulations. Unless expressly prohibited by local law, 
electronic communications may be archived in countries other than the country 
in which you are located, and may be treated in accordance with the laws and 
regulations of the country of each individual included in the entire chain.

Copyright NatWest Markets Plc. All rights reserved. See 
https://www.nwm.com/disclaimer for further risk disclosure.

Reply via email to