Re: Flink 1.4.2 -> 1.5.0 upgrade Queryable State debugging
Yeah I just went to reproduce this on a fresh environment, I blew away all the Zookeeper data and the error went away. I'm running HA JobManager (1 active, 2 standby) and 3 TMs. I'm not sure how to fully account for this behavior yet, it looks like I can make this run from a totally fresh environment, so until I start practicing how to save point + restore a 1.4.2 -> 1.5.0 job, I look to be good for the moment. Sorry to have bothered you all. Thanks. From: Dawid Wysakowicz Sent: Friday, July 20, 2018 3:09:46 AM To: Philip Doctor Cc: user; Kostas Kloudas Subject: Re: Flink 1.4.2 -> 1.5.0 upgrade Queryable State debugging Hi Philip, Could you attach the full stack trace? Are you querying the same job/cluster in both tests? I am also looping in Kostas, who might know more about changes in Queryable state between 1.4.2 and 1.5.0. Best, Dawid On Thu, 19 Jul 2018 at 22:33, Philip Doctor mailto:philip.doc...@physiq.com>> wrote: Dear Flink Users, I'm trying to upgrade to flink 1.5.0, so far everything works except for the Queryable state client. Now here's where it gets weird. I have the client sitting behind a web API so the rest of our non-java ecosystem can consume it. I've got 2 tests, one calls my route directly as a java method call, the other calls the deployed server via HTTP (the difference in the test intending to flex if the server is properly started, etc). The local call succeeds, the remote call fails, but the failure isn't what I'd expect (around the server layer) it's compalining about contacting the oracle for state location: Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not contact the state location oracle to retrieve the state location.\n\tat org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)\n The client call is pretty straightforward: return client.getKvState( flinkJobID, stateDescriptor.queryableStateName, key, keyTypeInformation, stateDescriptor ) I've confirmed via logs that I have the exact same key, flink job ID, and queryable state name. So I'm going bonkers on what difference might exist, I'm wondering if I'm packing my jar wrong and there's some resources I need to look out for? (back on flink 1.3.x I had to handle the reference.conf file for AKKA when you were depending on that for Queryable State, is there something like that? etc). Is there /more logging/ somewhere on the server side that might give me a hint? like "Tried to query state for X but couldn't find the BananaLever" ? I'm pretty stuck right now and ready to try any random ideas to move forward. The only changes I've made aside from the jar version bump from 1.4.2 to 1.5.0 is handling queryableStateName (which is now nullable), no other code changes were required, and to confirm, this all works just fine with 1.4.2. Thank you.
Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
Oh you were asking about the cast exception, I haven't seen that before, sorry to be off topic. From: Philip Doctor Sent: Thursday, July 19, 2018 9:27:15 PM To: Gregory Fee; user Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord I'm just a flink user, not an expert. I've seen that exception before. I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying. It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause. I'd read your logs a little further back. From: Gregory Fee Sent: Thursday, July 19, 2018 9:10:37 PM To: user Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks! org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 5 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamSourceConversion$14.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 14 more Caused by: java.lang.Runtim
Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
I'm just a flink user, not an expert. I've seen that exception before. I have never seen it be the actual error, I usually see it when some other operator is throwing an uncaught exception and busy dying. It seems to me that the prior operator throws this error "Can't forward to the next operator" why? because the next operator's already dead, but the job is busy dying asynchronously, so you get a cloud of errors that sort of surround the root cause. I'd read your logs a little further back. From: Gregory Fee Sent: Thursday, July 19, 2018 9:10:37 PM To: user Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord Hello, I have a job running and I've gotten this error a few times. The job recovers from a checkpoint and seems to continue forward fine. Then the error will happen again sometime later, perhaps 1 hour. This looks like a Flink bug to me but I could use an expert opinion. Thanks! org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67) at com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 5 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamSourceConversion$14.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) ... 14 more Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84) at
Flink 1.4.2 -> 1.5.0 upgrade Queryable State debugging
Dear Flink Users, I'm trying to upgrade to flink 1.5.0, so far everything works except for the Queryable state client. Now here's where it gets weird. I have the client sitting behind a web API so the rest of our non-java ecosystem can consume it. I've got 2 tests, one calls my route directly as a java method call, the other calls the deployed server via HTTP (the difference in the test intending to flex if the server is properly started, etc). The local call succeeds, the remote call fails, but the failure isn't what I'd expect (around the server layer) it's compalining about contacting the oracle for state location: Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not contact the state location oracle to retrieve the state location.\n\tat org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)\n The client call is pretty straightforward: return client.getKvState( flinkJobID, stateDescriptor.queryableStateName, key, keyTypeInformation, stateDescriptor ) I've confirmed via logs that I have the exact same key, flink job ID, and queryable state name. So I'm going bonkers on what difference might exist, I'm wondering if I'm packing my jar wrong and there's some resources I need to look out for? (back on flink 1.3.x I had to handle the reference.conf file for AKKA when you were depending on that for Queryable State, is there something like that? etc). Is there /more logging/ somewhere on the server side that might give me a hint? like "Tried to query state for X but couldn't find the BananaLever" ? I'm pretty stuck right now and ready to try any random ideas to move forward. The only changes I've made aside from the jar version bump from 1.4.2 to 1.5.0 is handling queryableStateName (which is now nullable), no other code changes were required, and to confirm, this all works just fine with 1.4.2. Thank you.
Re: SinkFunction invoke method signature
> That is, whether the context interface is defined as Context or > Context makes no difference, since it is an interface which are always static. I don't think this is the case. Context<> is an inner interface, has a meaning in that scope, does not, so there's a very real difference. When you go to consume it, you have to consume <*> in order to meet the requirements of the interface, in my example, I want to write: override fun invoke(value: ByteArray, context: SinkFunction.Context) { but I can't, I have to write override fun invoke(value: ByteArray, context: SinkFunction.Context<*>) {is m In order to avoid a compile error and actually override the interface. This means Context<> to me, as a consumer, I have no type information about Context, and need to just unsafely downcast if I wanted to use it. This feels, at a minimum like a confusing API to consume. Can you provide some guidance on how I would consume this other than unsafely downcasting the contents of Context<>? physIQ From: Chesnay Schepler Sent: Saturday, July 14, 2018 3:54:33 AM To: Ashwin Sinha; Philip Doctor Cc: user@flink.apache.org Subject: Re: SinkFunction invoke method signature The variables T and IN aren't related to each other. That is, whether the context interface is defined as Context or Context makes no difference, since it is an interface which are always static. At runtime, the context given to the function should be of type Context, but I don't know why the invoke method (and StreamSink for that matter) use raw parameters. On 13.07.2018 19:35, Ashwin Sinha wrote: +1 We encountered the exact same issue today. On Fri, Jul 13, 2018 at 10:51 PM Philip Doctor mailto:philip.doc...@physiq.com>> wrote: Dear Flink Users, I noticed my sink's `invoke` method was deprecated, so I went to update it, I'm a little surprised by the new method signature, especially on Context (copy+pasted below for ease of discussion). Shouldn't Context be Context not Context ? based on the docs? I'm having a hard time understanding what's getting sent to me here in Context. Anyone have any insights on why these might be different ? /** * Interface for implementing user defined sink functionality. * * @param Input type parameter. */ @Public public interface SinkFunction extends Function, Serializable { /** * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about * an input record. * * The context is only valid for the duration of a * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use * afterwards! * * @param The type of elements accepted by the sink. */ @Public // Interface might be extended in the future with additional methods. interface Context { org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java -- Ashwin Sinha | Data Engineer ashwin.si...@go-mmt.com<mailto:shivam.sha...@go-mmt.com> | 9452075361 [http://www.mailmktg.makemytrip.com/images/mmt-sign-logo.png]<https://www.makemytrip.com/>[http://www.mailmktg.makemytrip.com/images/ibibo-sign-logo.png]<https://www.goibibo.com/>[http://www.mailmktg.makemytrip.com/images/redbus-sign-logo.png] <https://www.redbus.in/> ::DISCLAIMER:: This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.
SinkFunction invoke method signature
Dear Flink Users, I noticed my sink's `invoke` method was deprecated, so I went to update it, I'm a little surprised by the new method signature, especially on Context (copy+pasted below for ease of discussion). Shouldn't Context be Context not Context ? based on the docs? I'm having a hard time understanding what's getting sent to me here in Context. Anyone have any insights on why these might be different ? /** * Interface for implementing user defined sink functionality. * * @param Input type parameter. */ @Public public interface SinkFunction extends Function, Serializable { /** * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about * an input record. * * The context is only valid for the duration of a * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use * afterwards! * * @param The type of elements accepted by the sink. */ @Public // Interface might be extended in the future with additional methods. interface Context { org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
Re: Flink Kafka reads too many bytes .... Very rarely
Hi Stephan, Sorry for the slow response. I added some logging inside of my DeserializationSchema’s `deserialize(byte[] message)` method. I see the extra bytes appearing in that method. If there’s another place I should add logging, please let me know and I’m happy to do so. Additionally (and this is weird), I write all my messages to the DB, so I was looking for what messages didn’t make it (i.e. input message 1->10,000 which of those isn’t in the DB). Turns out all 10k are in the DB. I’m not sure if that indicates this message is read and then retried, or what. I would have guessed that somehow extra data got written to my topic, but kafka tool tell me otherwise. So from my application’s perspective it just looks like I get extra garbage data every now and then. This is actually a big relief, I toss out the garbage data and keep rolling. I hope this helps, thank you. From: Stephan Ewen <se...@apache.org> Date: Thursday, March 1, 2018 at 9:26 AM To: "user@flink.apache.org" <user@flink.apache.org> Cc: Philip Doctor <philip.doc...@physiq.com> Subject: Re: Flink Kafka reads too many bytes Very rarely Can you specify exactly where you have that excess of data? Flink uses basically Kafka's standard consumer and passes byte[] unmodified to the DeserializationSchema. Can you help us check whether the "too many bytes" happens already before or after the DeserializationSchema? - If the "too many bytes" already arrive at the DeserializationSchema, then we should dig into the way that Kafka's consumer is configured - If the "too many bytes" appears after the DeserializationSchema, then we should look into the DeserializationSchema, for example whether it is stateful, accidentally shared across threads, etc. On Thu, Mar 1, 2018 at 11:08 AM, Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote: Hi Phil, I've created a JIRA ticket for the problem that you described and linked it to this thread: FLINK-8820. Thank you, Fabian [1] https://issues.apache.org/jira/browse/FLINK-8820 2018-02-28 5:13 GMT+01:00 Philip Doctor <philip.doc...@physiq.com<mailto:philip.doc...@physiq.com>>: * The fact that I seem to get all of my data is currently leading me to discard and ignore this error Please ignore this statement, I typed this email as I was testing a theory, I meant to delete this line. This is still a very real issue for me. I was looking to try a work around tomorrow, I saw that the Kafka 11 consumer supported transactions for exactly once processing, I was going to read about that and see if I could somehow fail a read that I couldn’t deserialize and try again, and if that might make a difference (can I just retry this ?). I’m not sure how that’ll go. If you’ve got an idea for a work around, I’d be all ears too. From: Philip Doctor <philip.doc...@physiq.com<mailto:philip.doc...@physiq.com>> Date: Tuesday, February 27, 2018 at 10:02 PM To: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org<mailto:tzuli...@apache.org>>, Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Flink Kafka reads too many bytes Very rarely Honestly this has been a very frustrating issue to dig in to. The fact that I seem to get all of my data is currently leading me to discard and ignore this error, it’s rare, flink still seems to work, but something is very hard to debug here and despite some confusing observations, most of my evidence suggests that this originates in the flink kafka consumer.
Re: Flink Kafka reads too many bytes .... Very rarely
* The fact that I seem to get all of my data is currently leading me to discard and ignore this error Please ignore this statement, I typed this email as I was testing a theory, I meant to delete this line. This is still a very real issue for me. I was looking to try a work around tomorrow, I saw that the Kafka 11 consumer supported transactions for exactly once processing, I was going to read about that and see if I could somehow fail a read that I couldn’t deserialize and try again, and if that might make a difference (can I just retry this ?). I’m not sure how that’ll go. If you’ve got an idea for a work around, I’d be all ears too. From: Philip Doctor <philip.doc...@physiq.com> Date: Tuesday, February 27, 2018 at 10:02 PM To: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>, Fabian Hueske <fhue...@gmail.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Flink Kafka reads too many bytes Very rarely Honestly this has been a very frustrating issue to dig in to. The fact that I seem to get all of my data is currently leading me to discard and ignore this error, it’s rare, flink still seems to work, but something is very hard to debug here and despite some confusing observations, most of my evidence suggests that this originates in the flink kafka consumer.
Re: Flink Kafka reads too many bytes .... Very rarely
Hi Gordon and Fabian, I just re-ran test case vs Flink 1.3.2, I could not reproduce this error, so it does appear to be new to Flink 1.4.0 if my test is good. The difference between my local env and prod is mostly the scale, production has multi-broker Kafka cluster with durable backups, etc. Flink has Active-standby-standby Job Managers, multiple task-managers, full checkpoints to hdfs, etc; my local execution is a single Kafka instance, a single in memory StreamExecutionEnvironment. But my application code is identical. I tried comparing all of my kafka settings (max message size, etc), they seem in line aside from being single instance. I’m not trying to rule out my environment as a factor but I have tried very hard to examine it, this issue has proved very frustrating to reproduce otherwise I would have happily sent a test case or even made a pass at debugging it myself. * what exactly your deserialization schema is doing? It’s some google flatbuffers data, so it’s a byte array that gets read into a flatbuffer schema that will read at certain offsets to pull out values (ex: ID, timestamp, Array). I think it’s out of scope since I can see that the byte count is wrong in problem cases, which is before I get to stick it into a flatbuffer deserializer. At the same time, something does seem important about the payload. I’m loading ~2 million data points across ~30 datasets into flink. This is the only one that exhibits this problem. I spent over a day digging in to what might be different about this dataset, I can’t manage to find it. This makes me incredibly suspicious, I wouldn’t have emailed you all had I not measured the bytes in kafka vs after the flink read. Honestly this has been a very frustrating issue to dig in to. The fact that I seem to get all of my data is currently leading me to discard and ignore this error, it’s rare, flink still seems to work, but something is very hard to debug here and despite some confusing observations, most of my evidence suggests that this originates in the flink kafka consumer. If I can help more, please let me know. Thank you for your replies. -Phil From: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> Date: Tuesday, February 27, 2018 at 3:12 AM To: Fabian Hueske <fhue...@gmail.com>, Philip Doctor <philip.doc...@physiq.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Flink Kafka reads too many bytes Very rarely Hi Philip, Yes, I also have the question that Fabian mentioned. Did you start observing this only after upgrading to 1.4.0? Could you let me know what exactly your deserialization schema is doing? I don’t have any clues at the moment, but maybe there are hints there. Also, you mentioned that the issue could not be reproduced on a local setup, only in “near-production” environments. What main differences are there between the two? Cheers, Gordon On 27 February 2018 at 5:05:33 PM, Fabian Hueske (fhue...@gmail.com<mailto:fhue...@gmail.com>) wrote: Hi, Thanks for reporting the error and sharing the results of your detailed analysis! I don't have enough insight into the Kafka consumer, but noticed that you said you've been running your application for over a year and only noticed the faulty behavior recently. Flink 1.4.0 was released in mid December last year. Did you observe the bug before you migrated to 1.4.0? I'll add Gordon to the thread who is the expert about Flink's Kafka consumer. Best, Fabian 2018-02-27 0:02 GMT+01:00 Philip Doctor <philip.doc...@physiq.com<mailto:philip.doc...@physiq.com>>: Hello, I’m using Flink 1.4.0 with FlinkKafkaConsumer010 and have been for almost a year. Recently, I started getting messages of the wrong length in Flink causing my deserializer to fail. Let me share what I’ve learned: 1. All of my messages are 520 bytes exactly when my producer places them in kafka 2. About 1% of these messages have this deserialization issue in flink 3. When it happens, I read 10104 bytes in flink 4. When I write the bytes my producer creates to a file on disk (rather than kafka) my code reads 520 bytes and consumes them without issue off of disk 5. When I use kafka tool (http://www.kafkatool.com/index.html) to dump the contents of my topic to disk, and read each message one at a time off of disk, my code reads 520 bytes per message and consumes them without issue 6. When I write a simple Kafka consumer (not using flink) I read one message at a time it’s 520 bytes and my code runs without issue #5 and #6 are what lead me to believe that this issue is squarely a problem with Flink. However, it gets more complicated, I took the messages I wrote out with both my simple consumer and the kafka tool, and I load them into a local kafka server, then attach a local flink cluster and I cannot reproduce the error, yet I can reproduce it 100% of the time in somethi
Flink Kafka reads too many bytes .... Very rarely
Hello, I’m using Flink 1.4.0 with FlinkKafkaConsumer010 and have been for almost a year. Recently, I started getting messages of the wrong length in Flink causing my deserializer to fail. Let me share what I’ve learned: 1. All of my messages are 520 bytes exactly when my producer places them in kafka 2. About 1% of these messages have this deserialization issue in flink 3. When it happens, I read 10104 bytes in flink 4. When I write the bytes my producer creates to a file on disk (rather than kafka) my code reads 520 bytes and consumes them without issue off of disk 5. When I use kafka tool (http://www.kafkatool.com/index.html) to dump the contents of my topic to disk, and read each message one at a time off of disk, my code reads 520 bytes per message and consumes them without issue 6. When I write a simple Kafka consumer (not using flink) I read one message at a time it’s 520 bytes and my code runs without issue #5 and #6 are what lead me to believe that this issue is squarely a problem with Flink. However, it gets more complicated, I took the messages I wrote out with both my simple consumer and the kafka tool, and I load them into a local kafka server, then attach a local flink cluster and I cannot reproduce the error, yet I can reproduce it 100% of the time in something closer to my production environment. I realize this latter sounds suspicious, but I have not found anything in the Kafka docs indicating that I might have a configuration issue here, yet my simple local setup that would allow me to iterate on this and debug has failed me. I’m really quite at a loss here, I believe there’s a Flink Kafka consumer bug, it happens exceedingly rarely as I went a year without seeing it. I can reproduce it in an expensive environment but not in a “cheap” environment. Thank you for your time, I can provide my sample data set in case that helps. I dumped it on my google drive https://drive.google.com/file/d/1h8jpAFdkSolMrT8n47JJdS6x21nd_b7n/view?usp=sharing that’s the full data set, about 1% of it ends up failing, it’s really hard to figure out which message since I can’t read any of the message that I receive and I get data out of order.
Re: Calling an operator serially
I guess my logs suggest this is simply what a KeyedStream does by default, I guess I was just trying to find a doc that said that rather than relying on my logs. From: Philip Doctor <philip.doc...@physiq.com> Date: Tuesday, December 12, 2017 at 5:50 PM To: "user@flink.apache.org" <user@flink.apache.org> Subject: Calling an operator serially I’ve got a KeyedStream, I only want max parallelism (1) per key in the keyed stream (i.e. if key is OrganizationFoo then only 1 input at a time from OrganizationFoo is processed by this operator). I feel like this is obvious somehow, but I’m struggling to find the docs for this. Can anyone point me the right way here?
Calling an operator serially
I’ve got a KeyedStream, I only want max parallelism (1) per key in the keyed stream (i.e. if key is OrganizationFoo then only 1 input at a time from OrganizationFoo is processed by this operator). I feel like this is obvious somehow, but I’m struggling to find the docs for this. Can anyone point me the right way here?
Testing GlobalWindows
I have a GlobalWindow with a custom trigger (I leave windows open for a variable length of time depending on how much data I have vs the expected amount, so I’m manipulating triggerContext.registerProcessingTimeTimer()). When I emit data into my data stream, the flink execution environment appears to halt after the test data is exhausted but before my GlobalWidow is triggered. I tried changing my trigger to wait zero seconds on window full, but that just appears to have made my test racy where sometimes the global window triggers and calls apply (so the test passes) and sometimes the environment appears to halt first. Is there a way for me to leave the execution environment running for a few seconds after all of my data is emitted? Or is there a good way for me to test this? So far my only solution has been to use env.fromCollection() in flink, and then pass a custom iterator class where the iterator.next() itself hangs before delivering the last value for Thread.sleep(10_000) (the last value I insert become untested garbage). That gives the window a chance to trigger and I always get the correct results (huzzah) but it's super hacky. Any advice here is greatly appreciated. Thanks, Phil
Sink -> Source
I have a few Flink jobs. Several of them share the same code. I was wondering if I could make those shared steps their own job and then specify that the sink for one process was the source for another process, stiching my jobs together. Is this possible ? I didn’t see it in the docs.. It feels like I could possibly hack something together with writeToSocket() on my data stream and then create a source that reads from a socket, but I was hoping there was a more fully baked solution to this. Thanks for your time.
Possible Data Corruption?
Dear Flink Users, I have a Flink (v1.2.1) process I left running for the last five days. It aggregates a bit of state and exposes it via Queryable State. It ran correctly for the first 3 days. There were no code changes or data changes, but suddenly Queryable State got weird. The process logs the current value of the queryable state, and from the logs I discerned that the state was correctly being aggregated. However they Queryable State that was returned was unable to be deserialized. Rather than the list of longs I expect, instead I get 2 bytes (0x 57 02). It seemed quite clear that the state in the Task Manager was not the state I was getting out of Queryable State. I next reasoned that my data was being check pointed and possibly I could restore. So I restarted the process to recover from a check point. At this point the process fails with the following error java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:231) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284) ... 6 more This looks to me like Flink has serialized out state incorrectly. I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so I could manually set the Kafka partition offset, I backed it up 5 days to replay all the data and now everything is working fine again. However I’m more than a little worried. Was there a serialization bug fixed in 1.3 ? I don’t believe there’s anything in my code that could be causing such an issue, but is there something in my jobs that could make something like this happen? Is this a known bug? The fact that it not only results in bad data in the query but appears to take down my disaster recovery plan makes me a bit nervous here. Thanks for your time, Phil
Restoring Queryable State
Hello, My job differs slightly from example Queryable State jobs. I have a keyed stream and I will emit managed ValueState at certain points at runtime but the names aren’t entirely known beforehand. I have check pointing enabled and when I restore from a check point, everything *almost* works. In my code I took a known ValueState name, hard coded a new ValueStateDescriptor, get state from the RuntimeContext, and I’ll see that my value is correctly restored. However, that value is not queryable unless I create a new ValueStateDescriptor that is queryable. So if I knew all of the ValueState names ahead of time, I could simply build a ValueStateDescriptor for each emitted value state name, and set each to queryable and presumably life would be good. But returning to the problem, I don’t know all the value state names. Is there a way that, on restore, the keyed managed ValueStates that I am restoring can also be queryable as they were prior to the crash? Thanks.
Queryable state in a keyed stream not querying properly
Dear Flink Users, I’m getting started with Flink and I’ve bumped into a small problem. I have a keyed stream like this: val stream = env.addSource(consumer) .flatMap(new ValidationMap()).name("ValidationMap") .keyBy(x => (x.getObj.foo(), x.getObj.bar(), x.getObj.baz())) .flatMap(new Calculator(this.config.size, this.config.queryableStateName)).name(jobname) Within my stream I have a ValueState that I use to maintain a list. I then use the QueryableStateClient to client.getKvState(flinkJobID, stateName, serializedKey.hashCode(), serializedKey); Where the “serializedKey” matches the .keyBy on the keyed stream. When I query the state things go wrong. I’ve determined that the JobManager appears to send my query to one of the three TaskManagers I have running, so about 1/3 of the time I get the proper result and the other 2/3 of the time I get org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace: KvState does not hold any state for key/namespace. I feel like I must have somehow misconfigured my job, how can I instruct the job manager to properly query the TaskManager that has my data? Is there a specific setting or configuration I’m missing? Thank you for your time. -Phil