Hi Aljoscha! I am not using any async i/o. I do use a trick similar to
ContinuousFileReaderOperator
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java>
where
I use another thread to write to the output asynchronously though.

On Mon, Jul 23, 2018 at 2:30 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Greg,
>
> just making sure but is there any asynchrony in your user functions? Any
> Async I/O operator maybe?
>
> Best,
> Aljoscha
>
> On 20. Jul 2018, at 21:53, Gregory Fee <g...@lyft.com> wrote:
>
> This is on Flink 1.4.2. I filed it as Flink-9905. Thanks!
>
> On Fri, Jul 20, 2018 at 2:51 AM, Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Gregory,
>> I think it is some flink bug. Could you file a JIRA for it? Also which
>> version of flink are you using?
>> Best,
>> Dawid
>>
>> On Fri, 20 Jul 2018 at 04:34, vino yang <yanghua1...@gmail.com> wrote:
>>
>>> Hi Gregory,
>>>
>>> This exception seems a bug, you can create a issues in the JIRA.
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-20 10:28 GMT+08:00 Philip Doctor <philip.doc...@physiq.com>:
>>>
>>>> Oh you were asking about the cast exception, I haven't seen that
>>>> before, sorry to be off topic.
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* Philip Doctor <philip.doc...@physiq.com>
>>>> *Sent:* Thursday, July 19, 2018 9:27:15 PM
>>>> *To:* Gregory Fee; user
>>>> *Subject:* Re: org.apache.flink.streaming.run
>>>> time.streamrecord.LatencyMarker cannot be cast to
>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>>>
>>>>
>>>> I'm just a flink user, not an expert.  I've seen that exception
>>>> before.  I have never seen it be the actual error, I usually see it when
>>>> some other operator is throwing an uncaught exception and busy dying.  It
>>>> seems to me that the prior operator throws this error "Can't forward to the
>>>> next operator" why? because the next operator's already dead, but the job
>>>> is busy dying asynchronously, so you get a cloud of errors that sort of
>>>> surround the root cause.  I'd read your logs a little further back.
>>>> ------------------------------
>>>> *From:* Gregory Fee <g...@lyft.com>
>>>> *Sent:* Thursday, July 19, 2018 9:10:37 PM
>>>> *To:* user
>>>> *Subject:* org.apache.flink.streaming.runtime.streamrecord.LatencyMarker
>>>> cannot be cast to org.apache.flink.streaming.run
>>>> time.streamrecord.StreamRecord
>>>>
>>>> Hello, I have a job running and I've gotten this error a few times. The
>>>> job recovers from a checkpoint and seems to continue forward fine. Then the
>>>> error will happen again sometime later, perhaps 1 hour. This looks like a
>>>> Flink bug to me but I could use an expert opinion. Thanks!
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>> Could not forward element to next operator
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.pushToOperator(OperatorChain.java:566)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.collect(OperatorChain.java:524)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.collect(OperatorChain.java:504)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>> at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(Bet
>>>> terBuffer.java:316)
>>>> Caused by: 
>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>> Could not forward element to next operator
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.pushToOperator(OperatorChain.java:566)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.collect(OperatorChain.java:524)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.collect(OperatorChain.java:504)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto
>>>> r.collect(TimestampedCollector.java:51)
>>>> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElem
>>>> ent(Labeler.java:67)
>>>> at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElem
>>>> ent(Labeler.java:48)
>>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro
>>>> cessElement(ProcessOperator.java:66)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>>> ... 5 more
>>>> Caused by: 
>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>> Could not forward element to next operator
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.pushToOperator(OperatorChain.java:566)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.collect(OperatorChain.java:524)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.collect(OperatorChain.java:504)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto
>>>> r.collect(TimestampedCollector.java:51)
>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect
>>>> (CRowWrappingCollector.scala:37)
>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect
>>>> (CRowWrappingCollector.scala:28)
>>>> at DataStreamSourceConversion$14.processElement(Unknown Source)
>>>> at org.apache.flink.table.runtime.CRowOutputProcessRunner.proce
>>>> ssElement(CRowOutputProcessRunner.scala:67)
>>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro
>>>> cessElement(ProcessOperator.java:66)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>>> ... 14 more
>>>> Caused by: java.lang.RuntimeException: org.apache.flink.streaming.run
>>>> time.streamrecord.LatencyMarker cannot be cast to
>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pus
>>>> hToRecordWriter(RecordWriterOutput.java:105)
>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.col
>>>> lect(RecordWriterOutput.java:84)
>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.col
>>>> lect(RecordWriterOutput.java:42)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>>> tor$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>> at org.apache.flink.streaming.api.operators.TimestampedCollecto
>>>> r.collect(TimestampedCollector.java:51)
>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect
>>>> (CRowWrappingCollector.scala:37)
>>>> at org.apache.flink.table.runtime.CRowWrappingCollector.collect
>>>> (CRowWrappingCollector.scala:28)
>>>> at DataStreamCalcRule$37.processElement(Unknown Source)
>>>> at org.apache.flink.table.runtime.CRowProcessRunner.processElem
>>>> ent(CRowProcessRunner.scala:66)
>>>> at org.apache.flink.table.runtime.CRowProcessRunner.processElem
>>>> ent(CRowProcessRunner.scala:35)
>>>> at org.apache.flink.streaming.api.operators.ProcessOperator.pro
>>>> cessElement(ProcessOperator.java:66)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>>> ... 25 more
>>>> Caused by: java.lang.ClassCastException: org.apache.flink.streaming.run
>>>> time.streamrecord.LatencyMarker cannot be cast to
>>>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>>> at org.apache.flink.streaming.runtime.partitioner.KeyGroupStrea
>>>> mPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
>>>> at org.apache.flink.streaming.runtime.partitioner.KeyGroupStrea
>>>> mPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>>>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.
>>>> emit(RecordWriter.java:88)
>>>> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emi
>>>> t(StreamRecordWriter.java:84)
>>>> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pus
>>>> hToRecordWriter(RecordWriterOutput.java:102)
>>>> ... 37 more
>>>>
>>>> --
>>>> *Gregory Fee*
>>>> Engineer
>>>> 425.830.4734 <+14258304734>
>>>> [image: Lyft] <http://www.lyft.com/>
>>>>
>>>
>>>
>
>
> --
> *Gregory Fee*
> Engineer
> 425.830.4734 <+14258304734>
> [image: Lyft] <http://www.lyft.com/>
>
>
>


-- 
*Gregory Fee*
Engineer
425.830.4734 <+14258304734>
[image: Lyft] <http://www.lyft.com>

Reply via email to