Sorry I wasn't clear. No, the lock contention is not in Flink. On Friday, February 26, 2016, Stephan Ewen <se...@apache.org> wrote:
> Was the contended lock part of Flink's runtime, or the application code? > If it was part of the Flink Runtime, can you share what you found? > > On Thu, Feb 25, 2016 at 6:03 PM, Nick Dimiduk <ndimi...@gmail.com > <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote: > >> For what it's worth, I dug into the TM logs and found that this exception >> was not the root cause, merely a symptom of other backpressure building in >> the flow (actually, lock contention in another part of the stack). While >> Flink was helpful in finding and bubbling up this stack to the UI, it was >> ultimately missleading, caused me to overlook proper evaluation of the >> failure. >> >> On Wed, Jan 20, 2016 at 2:59 AM, Robert Metzger <rmetz...@apache.org >> <javascript:_e(%7B%7D,'cvml','rmetz...@apache.org');>> wrote: >> >>> Hey Nick, >>> >>> I had a discussion with Stephan Ewen on how we could resolve the issue. >>> I filed a JIRA with our suggested approach: >>> https://issues.apache.org/jira/browse/FLINK-3264 >>> >>> By handling this directly in the KafkaConsumer, we would avoid fetching >>> data we can not handle anyways (discarding in the deserialization schema >>> would be more inefficient). >>> >>> Let us know what you think about our suggested approach. >>> >>> Sadly, it seems that the Kafka 0.9 consumer API does not yet support >>> requesting the latest offset of a TopicPartition. I'll ask about this on >>> their ML. >>> >>> >>> >>> >>> On Sun, Jan 17, 2016 at 8:28 PM, Nick Dimiduk <ndimi...@gmail.com >>> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote: >>> >>>> On Sunday, January 17, 2016, Stephan Ewen <se...@apache.org >>>> <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote: >>>> >>>>> I agree, real time streams should never go down. >>>>> >>>> >>>> Glad to hear that :) >>>> >>>> >>>>> [snip] Both should be supported. >>>>> >>>> >>>> Agreed. >>>> >>>> >>>>> Since we interpret streaming very broadly (also including analysis of >>>>> historic streams or timely data), the "backpressure/catch-up" mode seemed >>>>> natural as the first one to implement. >>>>> >>>> >>>> Indeed, this is what my job is doing. I have set it to, lacking a valid >>>> offset, start from the beginning. I have to presume that in my case the >>>> stream data is expiring faster than my consumers can keep up. However I >>>> haven't investigated proper monitoring yet. >>>> >>>> >>>>> The "load shedding" variant can probably even be realized in the Kafka >>>>> consumer, without complex modifications to the core Flink runtime itself. >>>>> >>>> >>>> I agree here as well. Indeed, this exception is being thrown from the >>>> consumer, not the runtime. >>>> >>>> >>>> >>>>> On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk <ndimi...@gmail.com> >>>>> wrote: >>>>> >>>>>> This goes back to the idea that streaming applications should never >>>>>> go down. I'd much rather consume at max capacity and knowingly drop some >>>>>> portion of the incoming pipe than have the streaming job crash. Of >>>>>> course, >>>>>> once the job itself is robust, I still need the runtime to be robust -- >>>>>> YARN vs (potential) Mesos vs standalone cluster will be my next >>>>>> consideration. >>>>>> >>>>>> I can share some details about my setup, but not at this time; in >>>>>> part because I don't have my metrics available at the moment and in part >>>>>> because this is a public, archived list. >>>>>> >>>>>> On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> @Robert: Is it possible to add a "fallback" strategy to the >>>>>>> consumer? Something like "if offsets cannot be found, use latest"? >>>>>>> >>>>>>> I would make this an optional feature to activate. I would think it >>>>>>> is quite surprising to users if records start being skipped in certain >>>>>>> situations. But I can see that this would be desirable sometimes. >>>>>>> >>>>>>> More control over skipping the records could be something to >>>>>>> implement in an extended version of the Kafka Consumer. A user could >>>>>>> define >>>>>>> a policy that, in case consumer falls behind producer more than X >>>>>>> (offsets), it starts requesting the latest offsets (rather than the >>>>>>> following), thereby skipping a bunch of records. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger <rmetz...@apache.org >>>>>>> > wrote: >>>>>>> >>>>>>>> Hi Nick, >>>>>>>> >>>>>>>> I'm sorry you ran into the issue. Is it possible that Flink's Kafka >>>>>>>> consumer falls back in the topic so far that the offsets it's >>>>>>>> requesting >>>>>>>> are invalid? >>>>>>>> >>>>>>>> For that, the retention time of Kafka has to be pretty short. >>>>>>>> >>>>>>>> Skipping records under load is something currently not supported by >>>>>>>> Flink itself. The only idea I had for handling this would be to give >>>>>>>> the >>>>>>>> DeserializationSchema a call back to request the latest offset from >>>>>>>> Kafka >>>>>>>> to determine the lag. With that, the schema could determine a "dropping >>>>>>>> rate" to catch up. >>>>>>>> What would you as an application developer expect to handle the >>>>>>>> situation? >>>>>>>> >>>>>>>> >>>>>>>> Just out of curiosity: What's the throughput you have on the Kafka >>>>>>>> topic? >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <ndimi...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi folks, >>>>>>>>> >>>>>>>>> I have a streaming job that consumes from of a kafka topic. The >>>>>>>>> topic is pretty active so the local-mode single worker is obviously >>>>>>>>> not >>>>>>>>> able to keep up with the fire-hose. I expect the job to skip records >>>>>>>>> and >>>>>>>>> continue on. However, I'm getting an exception from the LegacyFetcher >>>>>>>>> which >>>>>>>>> kills the job. This is very much *not* what I want. Any thoughts? The >>>>>>>>> only >>>>>>>>> thing I find when I search for this error message is a link back to >>>>>>>>> FLINK-2656. I'm running roughly 0.10-release/HEAD. >>>>>>>>> >>>>>>>>> Thanks a lot, >>>>>>>>> Nick >>>>>>>>> >>>>>>>>> java.lang.Exception: Found invalid offsets more than once in >>>>>>>>> partitions [FetchPartition {partition=X, offset=Y}] Exceptions: >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>> Caused by: java.lang.RuntimeException: Found invalid offsets more >>>>>>>>> than once in partitions [FetchPartition {partition=X, offset=Y}] >>>>>>>>> Exceptions: >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412) >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >