Hi Aljoscha! I am not using any async i/o. I do use a trick similar to ContinuousFileReaderOperator <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java> where I use another thread to write to the output asynchronously though.
On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Greg, > > just making sure but is there any asynchrony in your user functions? Any > Async I/O operator maybe? > > Best, > Aljoscha > > On 20. Jul 2018, at 21:53, Gregory Fee <g...@lyft.com> wrote: > > This is on Flink 1.4.2. I filed it as Flink-9905. Thanks! > > On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Gregory, >> I think it is some flink bug. Could you file a JIRA for it? Also which >> version of flink are you using? >> Best, >> Dawid >> >> On Fri, 20 Jul 2018 at 04:34, vino yang <yanghua1...@gmail.com> wrote: >> >>> Hi Gregory, >>> >>> This exception seems a bug, you can create a issues in the JIRA. >>> >>> Thanks, vino. >>> >>> 2018-07-20 10:28 GMT+08:00 Philip Doctor <philip.doc...@physiq.com>: >>> >>>> Oh you were asking about the cast exception, I haven't seen that >>>> before, sorry to be off topic. >>>> >>>> >>>> >>>> ------------------------------ >>>> *From:* Philip Doctor <philip.doc...@physiq.com> >>>> *Sent:* Thursday, July 19, 2018 9:27:15 PM >>>> *To:* Gregory Fee; user >>>> *Subject:* Re: org.apache.flink.streaming.run >>>> time.streamrecord.LatencyMarker cannot be cast to >>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >>>> >>>> >>>> I'm just a flink user, not an expert. I've seen that exception >>>> before. I have never seen it be the actual error, I usually see it when >>>> some other operator is throwing an uncaught exception and busy dying. It >>>> seems to me that the prior operator throws this error "Can't forward to the >>>> next operator" why? because the next operator's already dead, but the job >>>> is busy dying asynchronously, so you get a cloud of errors that sort of >>>> surround the root cause. I'd read your logs a little further back. >>>> ------------------------------ >>>> *From:* Gregory Fee <g...@lyft.com> >>>> *Sent:* Thursday, July 19, 2018 9:10:37 PM >>>> *To:* user >>>> *Subject:* org.apache.flink.streaming.runtime.streamrecord.LatencyMarker >>>> cannot be cast to org.apache.flink.streaming.run >>>> time.streamrecord.StreamRecord >>>> >>>> Hello, I have a job running and I've gotten this error a few times. The >>>> job recovers from a checkpoint and seems to continue forward fine. Then the >>>> error will happen again sometime later, perhaps 1 hour. This looks like a >>>> Flink bug to me but I could use an expert opinion. Thanks! >>>> >>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>> Could not forward element to next operator >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.pushToOperator(OperatorChain.java:566) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.collect(OperatorChain.java:524) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.collect(OperatorChain.java:504) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808) >>>> at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(Bet >>>> terBuffer.java:316) >>>> Caused by: >>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>> Could not forward element to next operator >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.pushToOperator(OperatorChain.java:566) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.collect(OperatorChain.java:524) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.collect(OperatorChain.java:504) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808) >>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto >>>> r.collect(TimestampedCollector.java:51) >>>> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElem >>>> ent(Labeler.java:67) >>>> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElem >>>> ent(Labeler.java:48) >>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro >>>> cessElement(ProcessOperator.java:66) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.pushToOperator(OperatorChain.java:549) >>>> ... 5 more >>>> Caused by: >>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>> Could not forward element to next operator >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.pushToOperator(OperatorChain.java:566) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.collect(OperatorChain.java:524) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.collect(OperatorChain.java:504) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808) >>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto >>>> r.collect(TimestampedCollector.java:51) >>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect >>>> (CRowWrappingCollector.scala:37) >>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect >>>> (CRowWrappingCollector.scala:28) >>>> at DataStreamSourceConversion$14.processElement(Unknown Source) >>>> at org.apache.flink.table.runtime.CRowOutputProcessRunner.proce >>>> ssElement(CRowOutputProcessRunner.scala:67) >>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro >>>> cessElement(ProcessOperator.java:66) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.pushToOperator(OperatorChain.java:549) >>>> ... 14 more >>>> Caused by: java.lang.RuntimeException: org.apache.flink.streaming.run >>>> time.streamrecord.LatencyMarker cannot be cast to >>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pus >>>> hToRecordWriter(RecordWriterOutput.java:105) >>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.col >>>> lect(RecordWriterOutput.java:84) >>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.col >>>> lect(RecordWriterOutput.java:42) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808) >>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto >>>> r.collect(TimestampedCollector.java:51) >>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect >>>> (CRowWrappingCollector.scala:37) >>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect >>>> (CRowWrappingCollector.scala:28) >>>> at DataStreamCalcRule$37.processElement(Unknown Source) >>>> at org.apache.flink.table.runtime.CRowProcessRunner.processElem >>>> ent(CRowProcessRunner.scala:66) >>>> at org.apache.flink.table.runtime.CRowProcessRunner.processElem >>>> ent(CRowProcessRunner.scala:35) >>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro >>>> cessElement(ProcessOperator.java:66) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >>>> ngChainingOutput.pushToOperator(OperatorChain.java:549) >>>> ... 25 more >>>> Caused by: java.lang.ClassCastException: org.apache.flink.streaming.run >>>> time.streamrecord.LatencyMarker cannot be cast to >>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >>>> at org.apache.flink.streaming.runtime.partitioner.KeyGroupStrea >>>> mPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) >>>> at org.apache.flink.streaming.runtime.partitioner.KeyGroupStrea >>>> mPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) >>>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter. >>>> emit(RecordWriter.java:88) >>>> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emi >>>> t(StreamRecordWriter.java:84) >>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pus >>>> hToRecordWriter(RecordWriterOutput.java:102) >>>> ... 37 more >>>> >>>> -- >>>> *Gregory Fee* >>>> Engineer >>>> 425.830.4734 <+14258304734> >>>> [image: Lyft] <http://www.lyft.com/> >>>> >>> >>> > > > -- > *Gregory Fee* > Engineer > 425.830.4734 <+14258304734> > [image: Lyft] <http://www.lyft.com/> > > > -- *Gregory Fee* Engineer 425.830.4734 <+14258304734> [image: Lyft] <http://www.lyft.com>