Hi Josh,

That warning message was added as part of FLINK-4514. It pops out whenever a 
shard iterator was used after 5 minutes it was returned from Kinesis.
The only time spent between after a shard iterator was returned and before it 
was used to fetch the next batch of records, is on deserializing and emitting 
of the records of the last fetched batch.
So unless processing of the last fetched batch took over 5 minutes, this 
normally shouldn’t happen.

Have you noticed any sign of long, constant full GC for your Flink task 
managers? From your description and check in code, the only possible guess I 
can come up with now is that
the source tasks completely seized to be running for a period of time, and when 
it came back, the shard iterator was unexpectedly found to be expired. 
According to the graph you attached,
when the iterator was refreshed and tasks successfully fetched a few more 
batches, the source tasks again halted, and so on.
So you should see that same warning message right before every small peak 
within the graph.

Best Regards,
Gordon


On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:

Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no 
problems, but yesterday the Kinesis consumer started behaving strangely... My 
Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink 
Kinesis consumer started to stop consuming for periods of time (see the spikes 
in graph attached which shows data consumed by the Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is this 
log message which I believe is related to the problem:
2016-11-03 09:27:53,782 WARN  
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  - 
Encountered an unexpected expired iterator 
AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
 for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: 
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
85070511730234615865841151857942042863},SequenceNumberRange: 
{StartingSequenceNumber: 
49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the 
iterator ...

Having restarted the job from my last savepoint, it's consuming the stream fine 
again with no problems.

Do you have any idea what might be causing this, or anything I should do to 
investigate further?

Cheers,

Josh


On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Steffen,

Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the 
release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing 
this!).
The Flink community is going to release 1.1.3 asap, which will include the fix.
If you don’t want to wait for the release and want to try the fix now, you can 
also build on the current “release-1.1” branch, which already has FLINK-4514 
merged.
Sorry for the inconvenience. Let me know if you bump into any other problems 
afterwards.

Best Regards,
Gordon


On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi there,

I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
from a Kinesis stream. However, after a while (the exact duration varies
and is in the order of minutes) the Kinesis source doesn't emit any
further events and hence Flink doesn't produce any further output.
Eventually, an ExpiredIteratorException occurs in one of the task,
causing the entire job to fail:

> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
> expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while 
> right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future 
> than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; 
> Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
> dace9532-9031-54bc-8aa2-3cbfb136d590)

This seems to be related to FLINK-4514, which is marked as resovled for
Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
running isn't suspended but hangs just a few minutes after the job has
been started.

I've attached a log file showing the described behavior.

Any idea what may be wrong?

Thanks,
Steffen

Reply via email to