Re: Flink 1.4.2 -> 1.5.0 upgrade Queryable State debugging

2018-07-20 Thread Philip Doctor
Yeah I just went to reproduce this on a fresh environment, I blew away all the 
Zookeeper data and the error went away.  I'm running HA JobManager (1 active, 2 
standby) and 3 TMs.  I'm not sure how to fully account for this behavior yet, 
it looks like I can make this run from a totally fresh environment, so until I 
start practicing how to save point + restore a 1.4.2 -> 1.5.0 job, I look to be 
good for the moment.  Sorry to have bothered you all.


Thanks.


From: Dawid Wysakowicz 
Sent: Friday, July 20, 2018 3:09:46 AM
To: Philip Doctor
Cc: user; Kostas Kloudas
Subject: Re: Flink 1.4.2 -> 1.5.0 upgrade Queryable State debugging

Hi Philip,
Could you attach the full stack trace? Are you querying the same job/cluster in 
both tests?
I am also looping in Kostas, who might know more about changes in Queryable 
state between 1.4.2 and 1.5.0.
Best,
Dawid

On Thu, 19 Jul 2018 at 22:33, Philip Doctor 
mailto:philip.doc...@physiq.com>> wrote:
Dear Flink Users,
I'm trying to upgrade to flink 1.5.0, so far everything works except for the 
Queryable state client.  Now here's where it gets weird.  I have the client 
sitting behind a web API so the rest of our non-java ecosystem can consume it.  
I've got 2 tests, one calls my route directly as a java method call, the other 
calls the deployed server via HTTP (the difference in the test intending to 
flex if the server is properly started, etc).

The local call succeeds, the remote call fails, but the failure isn't what I'd 
expect (around the server layer) it's compalining about contacting the oracle 
for state location:


 Caused by: 
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not 
contact the state location oracle to retrieve the state location.\n\tat 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)\n


The client call is pretty straightforward:

return client.getKvState(
flinkJobID,
stateDescriptor.queryableStateName,
key,
keyTypeInformation,
stateDescriptor
)

I've confirmed via logs that I have the exact same key, flink job ID, and 
queryable state name.

So I'm going bonkers on what difference might exist, I'm wondering if I'm 
packing my jar wrong and there's some resources I need to look out for? (back 
on flink 1.3.x I had to handle the reference.conf file for AKKA when you were 
depending on that for Queryable State, is there something like that? etc).  Is 
there /more logging/ somewhere on the server side that might give me a hint?  
like "Tried to query state for X but couldn't find the BananaLever" ?  I'm 
pretty stuck right now and ready to try any random ideas to move forward.


The only changes I've made aside from the jar version bump from 1.4.2 to 1.5.0 
is handling queryableStateName (which is now nullable), no other code changes 
were required, and to confirm, this all works just fine with 1.4.2.


Thank you.


Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

2018-07-19 Thread Philip Doctor
Oh you were asking about the cast exception, I haven't seen that before, sorry 
to be off topic.





From: Philip Doctor 
Sent: Thursday, July 19, 2018 9:27:15 PM
To: Gregory Fee; user
Subject: Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker 
cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord



I'm just a flink user, not an expert.  I've seen that exception before.  I have 
never seen it be the actual error, I usually see it when some other operator is 
throwing an uncaught exception and busy dying.  It seems to me that the prior 
operator throws this error "Can't forward to the next operator" why? because 
the next operator's already dead, but the job is busy dying asynchronously, so 
you get a cloud of errors that sort of surround the root cause.  I'd read your 
logs a little further back.


From: Gregory Fee 
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot 
be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Hello, I have a job running and I've gotten this error a few times. The job 
recovers from a checkpoint and seems to continue forward fine. Then the error 
will happen again sometime later, perhaps 1 hour. This looks like a Flink bug 
to me but I could use an expert opinion. Thanks!


org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at 
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at 
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.Runtim

Re: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

2018-07-19 Thread Philip Doctor

I'm just a flink user, not an expert.  I've seen that exception before.  I have 
never seen it be the actual error, I usually see it when some other operator is 
throwing an uncaught exception and busy dying.  It seems to me that the prior 
operator throws this error "Can't forward to the next operator" why? because 
the next operator's already dead, but the job is busy dying asynchronously, so 
you get a cloud of errors that sort of surround the root cause.  I'd read your 
logs a little further back.


From: Gregory Fee 
Sent: Thursday, July 19, 2018 9:10:37 PM
To: user
Subject: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot 
be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord

Hello, I have a job running and I've gotten this error a few times. The job 
recovers from a checkpoint and seems to continue forward fine. Then the error 
will happen again sometime later, perhaps 1 hour. This looks like a Flink bug 
to me but I could use an expert opinion. Thanks!


org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at 
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)

at 
com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 5 more

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)

at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

at 
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

at DataStreamSourceConversion$14.processElement(Unknown Source)

at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)

... 14 more

Caused by: java.lang.RuntimeException: 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast to 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord

at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)

at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:84)

at 

Flink 1.4.2 -> 1.5.0 upgrade Queryable State debugging

2018-07-19 Thread Philip Doctor
Dear Flink Users,
I'm trying to upgrade to flink 1.5.0, so far everything works except for the 
Queryable state client.  Now here's where it gets weird.  I have the client 
sitting behind a web API so the rest of our non-java ecosystem can consume it.  
I've got 2 tests, one calls my route directly as a java method call, the other 
calls the deployed server via HTTP (the difference in the test intending to 
flex if the server is properly started, etc).

The local call succeeds, the remote call fails, but the failure isn't what I'd 
expect (around the server layer) it's compalining about contacting the oracle 
for state location:


 Caused by: 
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not 
contact the state location oracle to retrieve the state location.\n\tat 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)\n


The client call is pretty straightforward:

return client.getKvState(
flinkJobID,
stateDescriptor.queryableStateName,
key,
keyTypeInformation,
stateDescriptor
)

I've confirmed via logs that I have the exact same key, flink job ID, and 
queryable state name.

So I'm going bonkers on what difference might exist, I'm wondering if I'm 
packing my jar wrong and there's some resources I need to look out for? (back 
on flink 1.3.x I had to handle the reference.conf file for AKKA when you were 
depending on that for Queryable State, is there something like that? etc).  Is 
there /more logging/ somewhere on the server side that might give me a hint?  
like "Tried to query state for X but couldn't find the BananaLever" ?  I'm 
pretty stuck right now and ready to try any random ideas to move forward.


The only changes I've made aside from the jar version bump from 1.4.2 to 1.5.0 
is handling queryableStateName (which is now nullable), no other code changes 
were required, and to confirm, this all works just fine with 1.4.2.


Thank you.


Re: SinkFunction invoke method signature

2018-07-14 Thread Philip Doctor
> That is, whether the context interface is defined as Context or 
> Context makes no difference,
since it is an interface which are always static.


I don't think this is the case.  Context<> is an inner interface,  has a 
meaning in that scope,  does not, so there's a very real difference.  When 
you go to consume it, you have to consume <*> in order to meet the requirements 
of the interface, in my example, I want to write:

override fun invoke(value: ByteArray, context: SinkFunction.Context) 
{


but I can't, I have to write

override fun invoke(value: ByteArray, context: SinkFunction.Context<*>) {is m

In order to avoid a compile error and actually override the interface.


This means Context<> to me, as a consumer, I have no type information about 
Context, and need to just unsafely downcast if I wanted to use it.  This feels, 
at a minimum like a confusing API to consume.  Can you provide some guidance on 
how I would consume this other than unsafely downcasting the contents of 
Context<>?


physIQ


From: Chesnay Schepler 
Sent: Saturday, July 14, 2018 3:54:33 AM
To: Ashwin Sinha; Philip Doctor
Cc: user@flink.apache.org
Subject: Re: SinkFunction invoke method signature

The variables T and IN aren't related to each other.

That is, whether the context interface is defined as Context or Context 
makes no difference,
since it is an interface which are always static.

At runtime, the context given to the function should be of type Context,
but I don't know why the invoke method (and StreamSink for that matter) use raw 
parameters.

On 13.07.2018 19:35, Ashwin Sinha wrote:
+1

We encountered the exact same issue today.

On Fri, Jul 13, 2018 at 10:51 PM Philip Doctor 
mailto:philip.doc...@physiq.com>> wrote:
Dear Flink Users,
I noticed my sink's `invoke` method was deprecated, so I went to update it, I'm 
a little surprised by the new method signature, especially on Context 
(copy+pasted below for ease of discussion).  Shouldn't Context be Context 
not Context ? based on the docs?  I'm having a hard time understanding 
what's getting sent to me here in Context.  Anyone have any insights on why 
these might be different ?


/**
 * Interface for implementing user defined sink functionality.
 *
 * @param  Input type parameter.
 */
@Public
public interface SinkFunction extends Function, Serializable {


/**

 * Context that {@link SinkFunction SinkFunctions } can use for getting 
additional data about
 * an input record.
 *
 * The context is only valid for the duration of a
 * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context 
and use
 * afterwards!
 *
 * @param  The type of elements accepted by the sink.
 */
@Public // Interface might be extended in the future with additional methods.
interface Context {


org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java


--
Ashwin Sinha | Data Engineer
ashwin.si...@go-mmt.com<mailto:shivam.sha...@go-mmt.com> | 9452075361
[http://www.mailmktg.makemytrip.com/images/mmt-sign-logo.png]<https://www.makemytrip.com/>[http://www.mailmktg.makemytrip.com/images/ibibo-sign-logo.png]<https://www.goibibo.com/>[http://www.mailmktg.makemytrip.com/images/redbus-sign-logo.png]
<https://www.redbus.in/>


::DISCLAIMER::




This message is intended only for the use of the addressee and may contain 
information that is privileged, confidential and exempt from disclosure under 
applicable law. If the reader of this message is not the intended recipient, or 
the employee or agent responsible for delivering the message to the intended 
recipient, you are hereby notified that any dissemination, distribution or 
copying of this communication is strictly prohibited. If you have received this 
e-mail in error, please notify us immediately by return e-mail and delete this 
e-mail and all attachments from your system.



SinkFunction invoke method signature

2018-07-13 Thread Philip Doctor
Dear Flink Users,
I noticed my sink's `invoke` method was deprecated, so I went to update it, I'm 
a little surprised by the new method signature, especially on Context 
(copy+pasted below for ease of discussion).  Shouldn't Context be Context 
not Context ? based on the docs?  I'm having a hard time understanding 
what's getting sent to me here in Context.  Anyone have any insights on why 
these might be different ?


/**
 * Interface for implementing user defined sink functionality.
 *
 * @param  Input type parameter.
 */
@Public
public interface SinkFunction extends Function, Serializable {


/**

 * Context that {@link SinkFunction SinkFunctions } can use for getting 
additional data about
 * an input record.
 *
 * The context is only valid for the duration of a
 * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context 
and use
 * afterwards!
 *
 * @param  The type of elements accepted by the sink.
 */
@Public // Interface might be extended in the future with additional methods.
interface Context {


org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java


Re: Flink Kafka reads too many bytes .... Very rarely

2018-03-07 Thread Philip Doctor
Hi Stephan,
Sorry for the slow response.

I added some logging inside of my DeserializationSchema’s `deserialize(byte[] 
message)` method.

I see the extra bytes appearing in that method.

If there’s another place I should add logging, please let me know and I’m happy 
to do so.

Additionally (and this is weird), I write all my messages to the DB, so I was 
looking for what messages didn’t make it (i.e. input message 1->10,000 which of 
those isn’t in the DB).  Turns out all 10k are in the DB.  I’m not sure if that 
indicates this message is read and then retried, or what.  I would have guessed 
that somehow extra data got written to my topic, but kafka tool tell me 
otherwise.  So from my application’s perspective it just looks like I get extra 
garbage data every now and then.

This is actually a big relief, I toss out the garbage data and keep rolling.

I hope this helps, thank you.



From: Stephan Ewen <se...@apache.org>
Date: Thursday, March 1, 2018 at 9:26 AM
To: "user@flink.apache.org" <user@flink.apache.org>
Cc: Philip Doctor <philip.doc...@physiq.com>
Subject: Re: Flink Kafka reads too many bytes  Very rarely

Can you specify exactly where you have that excess of data?

Flink uses basically Kafka's standard consumer and passes byte[] unmodified to 
the DeserializationSchema. Can you help us check whether the "too many bytes" 
happens already before or after the DeserializationSchema?

  - If the "too many bytes" already arrive at the DeserializationSchema, then 
we should dig into the way that Kafka's consumer is configured


  - If the "too many bytes" appears after the  DeserializationSchema, then we 
should look into the DeserializationSchema, for example whether it is stateful, 
accidentally shared across threads, etc.





On Thu, Mar 1, 2018 at 11:08 AM, Fabian Hueske 
<fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote:
Hi Phil,
I've created a JIRA ticket for the problem that you described and linked it to 
this thread: FLINK-8820.
Thank you, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8820

2018-02-28 5:13 GMT+01:00 Philip Doctor 
<philip.doc...@physiq.com<mailto:philip.doc...@physiq.com>>:

  *   The fact that I seem to get all of my data is currently leading me to 
discard and ignore this error

Please ignore this statement, I typed this email as I was testing a theory, I 
meant to delete this line.  This is still a very real issue for me.  I was 
looking to try a work around tomorrow, I saw that the Kafka 11 consumer 
supported transactions for exactly once processing, I was going to read about 
that and see if I could somehow fail a read that I couldn’t deserialize and try 
again, and if that might make a difference (can I just retry this ?).  I’m not 
sure how that’ll go.  If you’ve got an idea for a work around, I’d be all ears 
too.


From: Philip Doctor <philip.doc...@physiq.com<mailto:philip.doc...@physiq.com>>
Date: Tuesday, February 27, 2018 at 10:02 PM
To: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org<mailto:tzuli...@apache.org>>, 
Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Flink Kafka reads too many bytes  Very rarely

Honestly this has been a very frustrating issue to dig in to.  The fact that I 
seem to get all of my data is currently leading me to discard and ignore this 
error, it’s rare, flink still seems to work, but something is very hard to 
debug here and despite some confusing observations, most of my evidence 
suggests that this originates in the flink kafka consumer.




Re: Flink Kafka reads too many bytes .... Very rarely

2018-02-27 Thread Philip Doctor
  *   The fact that I seem to get all of my data is currently leading me to 
discard and ignore this error

Please ignore this statement, I typed this email as I was testing a theory, I 
meant to delete this line.  This is still a very real issue for me.  I was 
looking to try a work around tomorrow, I saw that the Kafka 11 consumer 
supported transactions for exactly once processing, I was going to read about 
that and see if I could somehow fail a read that I couldn’t deserialize and try 
again, and if that might make a difference (can I just retry this ?).  I’m not 
sure how that’ll go.  If you’ve got an idea for a work around, I’d be all ears 
too.


From: Philip Doctor <philip.doc...@physiq.com>
Date: Tuesday, February 27, 2018 at 10:02 PM
To: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>, Fabian Hueske 
<fhue...@gmail.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Flink Kafka reads too many bytes  Very rarely

Honestly this has been a very frustrating issue to dig in to.  The fact that I 
seem to get all of my data is currently leading me to discard and ignore this 
error, it’s rare, flink still seems to work, but something is very hard to 
debug here and despite some confusing observations, most of my evidence 
suggests that this originates in the flink kafka consumer.


Re: Flink Kafka reads too many bytes .... Very rarely

2018-02-27 Thread Philip Doctor
Hi Gordon and Fabian,
I just re-ran test case vs Flink 1.3.2, I could not reproduce this error, so it 
does appear to be new to Flink 1.4.0 if my test is good.

The difference between my local env and prod is mostly the scale, production 
has multi-broker Kafka cluster with durable backups, etc.  Flink has 
Active-standby-standby Job Managers, multiple task-managers, full checkpoints 
to hdfs, etc; my local execution is a single Kafka instance, a single in memory 
StreamExecutionEnvironment.  But my application code is identical.  I tried 
comparing all of my kafka settings (max message size, etc), they seem in line 
aside from being single instance.   I’m not trying to rule out my environment 
as a factor but I have tried very hard to examine it, this issue has proved 
very frustrating to reproduce otherwise I would have happily sent a test case 
or even made a pass at debugging it myself.



  *   what exactly your deserialization schema is doing?

It’s some google flatbuffers data, so it’s a byte array that gets read into a 
flatbuffer schema that will read at certain offsets to pull out values (ex: ID, 
timestamp, Array).  I think it’s out of scope since I can see that 
the byte count is wrong in problem cases, which is before I get to stick it 
into a flatbuffer deserializer.  At the same time, something does seem 
important about the payload.  I’m loading ~2 million data points across ~30 
datasets into flink.  This is the only one that exhibits this problem.  I spent 
over a day digging in to what might be different about this dataset, I can’t 
manage to find it.  This makes me incredibly suspicious, I wouldn’t have 
emailed you all had I not measured the bytes in kafka vs after the flink read.

Honestly this has been a very frustrating issue to dig in to.  The fact that I 
seem to get all of my data is currently leading me to discard and ignore this 
error, it’s rare, flink still seems to work, but something is very hard to 
debug here and despite some confusing observations, most of my evidence 
suggests that this originates in the flink kafka consumer.

If I can help more, please let me know.  Thank you for your replies.

-Phil




From: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Date: Tuesday, February 27, 2018 at 3:12 AM
To: Fabian Hueske <fhue...@gmail.com>, Philip Doctor <philip.doc...@physiq.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Flink Kafka reads too many bytes  Very rarely

Hi Philip,

Yes, I also have the question that Fabian mentioned. Did you start observing 
this only after upgrading to 1.4.0?

Could you let me know what exactly your deserialization schema is doing? I 
don’t have any clues at the moment, but maybe there are hints there.
Also, you mentioned that the issue could not be reproduced on a local setup, 
only in “near-production” environments. What main differences are there between 
the two?

Cheers,
Gordon


On 27 February 2018 at 5:05:33 PM, Fabian Hueske 
(fhue...@gmail.com<mailto:fhue...@gmail.com>) wrote:
Hi,
Thanks for reporting the error and sharing the results of your detailed 
analysis!
I don't have enough insight into the Kafka consumer, but noticed that you said 
you've been running your application for over a year and only noticed the 
faulty behavior recently.
Flink 1.4.0 was released in mid December last year. Did you observe the bug 
before you migrated to 1.4.0?
I'll add Gordon to the thread who is the expert about Flink's Kafka consumer.
Best, Fabian


2018-02-27 0:02 GMT+01:00 Philip Doctor 
<philip.doc...@physiq.com<mailto:philip.doc...@physiq.com>>:
Hello,
I’m using Flink 1.4.0 with FlinkKafkaConsumer010 and have been for almost a 
year.  Recently, I started getting messages of the wrong length in Flink 
causing my deserializer to fail.  Let me share what I’ve learned:


  1.  All of my messages are 520 bytes exactly when my producer places them in 
kafka
  2.  About 1% of these messages have this deserialization issue in flink
  3.  When it happens, I read 10104 bytes in flink
  4.  When I write the bytes my producer creates to a file on disk (rather than 
kafka) my code reads 520 bytes and consumes them without issue off of disk
  5.  When I use kafka tool (http://www.kafkatool.com/index.html)  to dump the 
contents of my topic to disk, and read each message one at a time off of disk, 
my code reads 520 bytes per message and consumes them without issue
  6.  When I write a simple Kafka consumer (not using flink) I read one message 
at a time it’s 520 bytes and my code runs without issue

#5 and #6 are what lead me to believe that this issue is squarely a problem 
with Flink.

However, it gets more complicated, I took the messages I wrote out with both my 
simple consumer and the kafka tool, and I load them into a local kafka server, 
then attach a local flink cluster and I cannot reproduce the error, yet I can 
reproduce it 100% of the time in somethi

Flink Kafka reads too many bytes .... Very rarely

2018-02-26 Thread Philip Doctor
Hello,
I’m using Flink 1.4.0 with FlinkKafkaConsumer010 and have been for almost a 
year.  Recently, I started getting messages of the wrong length in Flink 
causing my deserializer to fail.  Let me share what I’ve learned:


  1.  All of my messages are 520 bytes exactly when my producer places them in 
kafka
  2.  About 1% of these messages have this deserialization issue in flink
  3.  When it happens, I read 10104 bytes in flink
  4.  When I write the bytes my producer creates to a file on disk (rather than 
kafka) my code reads 520 bytes and consumes them without issue off of disk
  5.  When I use kafka tool (http://www.kafkatool.com/index.html)  to dump the 
contents of my topic to disk, and read each message one at a time off of disk, 
my code reads 520 bytes per message and consumes them without issue
  6.  When I write a simple Kafka consumer (not using flink) I read one message 
at a time it’s 520 bytes and my code runs without issue

#5 and #6 are what lead me to believe that this issue is squarely a problem 
with Flink.

However, it gets more complicated, I took the messages I wrote out with both my 
simple consumer and the kafka tool, and I load them into a local kafka server, 
then attach a local flink cluster and I cannot reproduce the error, yet I can 
reproduce it 100% of the time in something closer to my production environment.

I realize this latter sounds suspicious, but I have not found anything in the 
Kafka docs indicating that I might have a configuration issue here, yet my 
simple local setup that would allow me to iterate on this and debug has failed 
me.

I’m really quite at a loss here, I believe there’s a Flink Kafka consumer bug, 
it happens exceedingly rarely as I went a year without seeing it.  I can 
reproduce it in an expensive environment but not in a “cheap” environment.

Thank you for your time, I can provide my sample data set in case that helps.  
I dumped it on my google drive 
https://drive.google.com/file/d/1h8jpAFdkSolMrT8n47JJdS6x21nd_b7n/view?usp=sharing
 that’s the full data set, about 1% of it ends up failing, it’s really hard to 
figure out which message since I can’t read any of the message that I receive 
and I get data out of order.




Re: Calling an operator serially

2017-12-12 Thread Philip Doctor
I guess my logs suggest this is simply what a KeyedStream does by default, I 
guess I was just trying to find a doc that said that rather than relying on my 
logs.

From: Philip Doctor <philip.doc...@physiq.com>
Date: Tuesday, December 12, 2017 at 5:50 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Calling an operator serially

I’ve got a KeyedStream, I only want max parallelism (1) per key in the keyed 
stream (i.e. if key is OrganizationFoo then only 1 input at a time from 
OrganizationFoo is processed by this operator).  I feel like this is obvious 
somehow, but I’m struggling to find the docs for this.  Can anyone point me the 
right way here?


Calling an operator serially

2017-12-12 Thread Philip Doctor
I’ve got a KeyedStream, I only want max parallelism (1) per key in the keyed 
stream (i.e. if key is OrganizationFoo then only 1 input at a time from 
OrganizationFoo is processed by this operator).  I feel like this is obvious 
somehow, but I’m struggling to find the docs for this.  Can anyone point me the 
right way here?


Testing GlobalWindows

2017-10-19 Thread Philip Doctor
I have a GlobalWindow with a custom trigger (I leave windows open for a 
variable length of time depending on how much data I have vs the expected 
amount, so I’m manipulating triggerContext.registerProcessingTimeTimer()).

When I emit data into my data stream, the flink execution environment appears 
to halt after the test data is exhausted but before my GlobalWidow is triggered.

I tried changing my trigger to wait zero seconds on window full, but that just 
appears to have made my test racy where sometimes the global window triggers 
and calls apply (so the test passes) and sometimes the environment appears to 
halt first.

Is there a way for me to leave the execution environment running for a few 
seconds after all of my data is emitted? Or is there a good way for me to test 
this? So far my only solution has been to use env.fromCollection() in flink, 
and then pass a custom iterator class where the iterator.next() itself hangs 
before delivering the last value for Thread.sleep(10_000) (the last value I 
insert become untested garbage). That gives the window a chance to trigger and 
I always get the correct results (huzzah) but it's super hacky.

Any advice here is greatly appreciated.

Thanks,
Phil



Sink -> Source

2017-08-31 Thread Philip Doctor
I have a few Flink jobs.  Several of them share the same code.  I was wondering 
if I could make those shared steps their own job and then specify that the sink 
for one process was the source for another process, stiching my jobs together.  
Is this possible ? I didn’t see it in the docs.. It feels like I could possibly 
hack something together with writeToSocket() on my data stream and then create 
a source that reads from a socket, but I was hoping there was a more fully 
baked solution to this.

Thanks for your time.


Possible Data Corruption?

2017-06-19 Thread Philip Doctor
Dear Flink Users,
I have a Flink (v1.2.1) process I left running for the last five days.  It 
aggregates a bit of state and exposes it via Queryable State.  It ran correctly 
for the first 3 days.  There were no code changes or data changes, but suddenly 
Queryable State got weird.  The process logs the current value of the queryable 
state, and from the logs I discerned that the state was correctly being 
aggregated.  However they Queryable State that was returned was unable to be 
deserialized.  Rather than the list of longs I expect, instead I get 2 bytes 
(0x 57 02).  It seemed quite clear that the state in the Task Manager was not 
the state I was getting out of Queryable State.

I next reasoned that my data was being check pointed and possibly I could 
restore.  So I restarted the process to recover from a check point.  At this 
point the process fails with the following error

java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:231)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)
... 6 more


This looks to me like Flink has serialized out state incorrectly.

I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so I 
could manually set the Kafka partition offset, I backed it up 5 days to replay 
all the data and now everything is working fine again.

However I’m more than a little worried.  Was there a serialization bug fixed in 
1.3 ?  I don’t believe there’s anything in my code that could be causing such 
an issue, but is there something in my jobs that could make something like this 
happen?  Is this a known bug?  The fact that it not only results in bad data in 
the query but appears to take down my disaster recovery plan makes me a bit 
nervous here.

Thanks for your time,
Phil



Restoring Queryable State

2017-06-01 Thread Philip Doctor
Hello,
My job differs slightly from example Queryable State jobs.  I have a keyed 
stream and I will emit managed ValueState at certain points at runtime but the 
names aren’t entirely known beforehand.

I have check pointing enabled and when I restore from a check point, everything 
*almost* works.  In my code I took a known ValueState name, hard coded a new 
ValueStateDescriptor, get state from the RuntimeContext, and I’ll see that my 
value is correctly restored.  However, that value is not queryable unless I 
create a new ValueStateDescriptor that is queryable.  So if I knew all of the 
ValueState names ahead of time, I could simply build a ValueStateDescriptor for 
each emitted value state name, and set each to queryable and presumably life 
would be good.  But returning to the problem, I don’t know all the value state 
names.

Is there a way that, on restore, the keyed managed ValueStates that I am 
restoring can also be queryable as they were prior to the crash?

Thanks.


Queryable state in a keyed stream not querying properly

2017-05-18 Thread Philip Doctor
Dear Flink Users,
I’m getting started with Flink and I’ve bumped into a small problem.  I have a 
keyed stream like this:

val stream = env.addSource(consumer)
  .flatMap(new ValidationMap()).name("ValidationMap")
  .keyBy(x => (x.getObj.foo(), x.getObj.bar(), x.getObj.baz()))
  .flatMap(new Calculator(this.config.size, 
this.config.queryableStateName)).name(jobname)


Within my stream I have a ValueState that I use to maintain a list.

I then use the QueryableStateClient to
client.getKvState(flinkJobID, stateName, serializedKey.hashCode(), 
serializedKey);

Where the “serializedKey” matches the .keyBy on the keyed stream.

When I query the state things go wrong.  I’ve determined that the JobManager 
appears to send my query to one of the three TaskManagers I have running, so 
about 1/3 of the time I get the proper result and the other 2/3 of the time I 
get

org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace: KvState does not 
hold any state for key/namespace.

I feel like I must have somehow misconfigured my job, how can I instruct the 
job manager to properly query the TaskManager that has my data?  Is there a 
specific setting or configuration I’m missing?

Thank you for your time.

-Phil