Is it possible that you have stalls in your topology?

Reasons could be:

  - The data sink blocks or becomes slow for some periods (where are you
sending the data to?)

  - If you are using large state and a state backend that only supports
synchronous checkpointing, there may be a delay introduced by the checkpoint


On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <kidder.sc...@gmail.com> wrote:

> Hi Steffan & Josh,
>
> For what it's worth, I've been using the Kinesis connector with very good
> results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL
> and AWS SDK dependencies to the following versions:
>
> aws.sdk.version: 1.11.34
> aws.kinesis-kcl.version: 1.7.0
>
> My customizations are visible in this commit on my fork:
> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039cb4d37518
> 859e159b32
>
> It might be worth testing with newer AWS SDK & KCL libraries to see if the
> problem persists.
>
> Best,
>
> --Scott Kidder
>
>
> On Thu, Nov 3, 2016 at 7:08 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi Gordon,
>>
>> Thanks for the fast reply!
>> You're right about the expired iterator exception occurring just before
>> each spike. I can't see any signs of long GC on the task managers... CPU
>> has been <15% the whole time when the spikes were taking place and I can't
>> see anything unusual in the task manager logs.
>>
>> But actually I just noticed that the Flink UI showed no successful
>> checkpoints during the time of the problem even though my checkpoint
>> interval is 15 minutes. So I guess this is probably some kind of Flink
>> problem rather than a problem with the Kinesis consumer. Unfortunately I
>> can't find anything useful in the logs so not sure what happened!
>>
>> Josh
>>
>>
>>
>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
>> > wrote:
>>
>>> Hi Josh,
>>>
>>> That warning message was added as part of FLINK-4514. It pops out
>>> whenever a shard iterator was used after 5 minutes it was returned from
>>> Kinesis.
>>> The only time spent between after a shard iterator was returned and
>>> before it was used to fetch the next batch of records, is on deserializing
>>> and emitting of the records of the last fetched batch.
>>> So unless processing of the last fetched batch took over 5 minutes, this
>>> normally shouldn’t happen.
>>>
>>> Have you noticed any sign of long, constant full GC for your Flink task
>>> managers? From your description and check in code, the only possible guess
>>> I can come up with now is that
>>> the source tasks completely seized to be running for a period of time,
>>> and when it came back, the shard iterator was unexpectedly found to be
>>> expired. According to the graph you attached,
>>> when the iterator was refreshed and tasks successfully fetched a few
>>> more batches, the source tasks again halted, and so on.
>>> So you should see that same warning message right before every small
>>> peak within the graph.
>>>
>>> Best Regards,
>>> Gordon
>>>
>>>
>>> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:
>>>
>>> Hey Gordon,
>>>
>>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>>> with no problems, but yesterday the Kinesis consumer started behaving
>>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
>>> however the Flink Kinesis consumer started to stop consuming for periods of
>>> time (see the spikes in graph attached which shows data consumed by the
>>> Flink Kinesis consumer)
>>>
>>> Looking in the task manager logs, there are no exceptions however there
>>> is this log message which I believe is related to the problem:
>>>
>>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>>> expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKc
>>> SvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8q
>>> kHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
>>> for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
>>> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>>> 85070511730234615865841151857942042863},SequenceNumberRange:
>>> {StartingSequenceNumber: 495665429169236488921642479266
>>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>>
>>> Having restarted the job from my last savepoint, it's consuming the
>>> stream fine again with no problems.
>>>
>>> Do you have any idea what might be causing this, or anything I should do
>>> to investigate further?
>>>
>>> Cheers,
>>>
>>> Josh
>>>
>>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
>>> > wrote:
>>>
>>>> Hi Steffen,
>>>>
>>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included
>>>> in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks
>>>> for noticing this!).
>>>> The Flink community is going to release 1.1.3 asap, which will include
>>>> the fix.
>>>> If you don’t want to wait for the release and want to try the fix now,
>>>> you can also build on the current “release-1.1” branch, which already has
>>>> FLINK-4514 merged.
>>>> Sorry for the inconvenience. Let me know if you bump into any other
>>>> problems afterwards.
>>>>
>>>> Best Regards,
>>>> Gordon
>>>>
>>>>
>>>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>>>> stef...@hausmann-family.de) wrote:
>>>>
>>>> Hi there,
>>>>
>>>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>>>> from a Kinesis stream. However, after a while (the exact duration varies
>>>> and is in the order of minutes) the Kinesis source doesn't emit any
>>>> further events and hence Flink doesn't produce any further output.
>>>> Eventually, an ExpiredIteratorException occurs in one of the task,
>>>> causing the entire job to fail:
>>>>
>>>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException:
>>>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC
>>>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in
>>>> the future than the tolerated delay of 300000 milliseconds. (Service:
>>>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
>>>> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>>>>
>>>> This seems to be related to FLINK-4514, which is marked as resovled for
>>>> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
>>>> running isn't suspended but hangs just a few minutes after the job has
>>>> been started.
>>>>
>>>> I've attached a log file showing the described behavior.
>>>>
>>>> Any idea what may be wrong?
>>>>
>>>> Thanks,
>>>> Steffen
>>>>
>>>>
>>>
>>
>

Reply via email to