Hi Phillip,

Thanks for testing it. From your log and my own tests, I can confirm the 
problem is with Kinesalite not correctly
mocking the official Kinesis behaviour for the `describeStream` API.

There’s a PR for the fix here: https://github.com/apache/flink/pull/2822. With 
this change, shard discovery
should work normally when tested against Kinesalite.

However, I’m not completely sure yet if the fix is viable, and would like to 
wait for others to take a look / review.
Therefore, it might not make it into the next Flink minor bugfix release. If 
you’d like, you can try out the patch for now
and see if the problem remains.

Best Regards,
Gordon

On November 17, 2016 at 1:07:44 AM, Philipp Bussche (philipp.buss...@gmail.com) 
wrote:

Hello Gordon,  

thank you for your help. I have set the discovery interval to 30 seconds and  
just starting the job on a clean kinesalite service (I am running it inside  
docker so every time the container gets stopped and removed to start from  
scratch).  

This is the output without actually any data in the stream:  

11/16/2016 17:59:03     Source: Custom Source -> Sink: Unnamed(1/1) switched to 
 
RUNNING  
17:59:04,673 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 will be seeded with initial shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'}, starting  
state set as sequence number LATEST_SEQUENCE_NUM  
17:59:04,674 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 will start consuming seeded shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'} from sequence  
number LATEST_SEQUENCE_NUM with ShardConsumer 0  
17:59:04,689 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 has discovered a new shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'} due to  
resharding, and will start consuming the shard from sequence number  
EARLIEST_SEQUENCE_NUM with ShardConsumer 1  
17:59:08,817 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 1 @ 1479315548815  
17:59:08,835 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 1 (in 20 ms)  
17:59:13,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 2 @ 1479315553815  
17:59:13,817 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 2 (in 1 ms)  
17:59:18,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 3 @ 1479315558814  
17:59:18,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 3 (in 1 ms)  
17:59:23,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 4 @ 1479315563815  
17:59:23,816 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 4 (in 1 ms)  
17:59:28,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 5 @ 1479315568813  
17:59:28,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 5 (in 1 ms)  
17:59:33,814 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 6 @ 1479315573814  
17:59:33,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 6 (in 1 ms)  
17:59:34,704 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 has discovered a new shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694685205596999719397165301965297537316555774230530,}}'} due to  
resharding, and will start consuming the shard from sequence number  
EARLIEST_SEQUENCE_NUM with ShardConsumer 2  

I then restarted the kinesalite container and posted a message to the stream  
before the 30 second mark occurred. The output shows that the job consumes  
from the 2 shards discovered initially (I initialized kinsalite with one  
shard only) right away and then continues to consume for the new shards to  
be discovered whenever they appear in 30 second frequencies. (I am posting a  
string to the stream but expect a JSON document in my job so the parsing  
kind of fails but look for the json output I am writing by just doing a  
job.print()):  

Thanks  
Philipp  

11/16/2016 18:03:30     Source: Custom Source -> Sink: Unnamed(1/1) switched to 
 
RUNNING  
18:03:30,832 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 will be seeded with initial shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694816512384728667706222664274486890894391754358786,}}'}, starting  
state set as sequence number LATEST_SEQUENCE_NUM  
18:03:30,833 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 will start consuming seeded shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694816512384728667706222664274486890894391754358786,}}'} from sequence  
number LATEST_SEQUENCE_NUM with ShardConsumer 0  
18:03:30,847 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 has discovered a new shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694816512384728667706222664274486890894391754358786,}}'} due to  
resharding, and will start consuming the shard from sequence number  
EARLIEST_SEQUENCE_NUM with ShardConsumer 1  
18:03:34,878 INFO org.apache.flink.api.java.typeutils.TypeExtractor  
- class de.harvee.dataspa.flink.model.HarveeEventError is not a valid POJO  
type  
18:03:35,093 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 1 @ 1479315815091  
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could  
not parse message"}  
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could  
not parse message"}  
18:03:35,191 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 1 (in 100 ms)  
18:03:40,003 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 2 @ 1479315820003  
18:03:40,005 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 2 (in 1 ms)  
18:03:45,005 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 3 @ 1479315825005  
18:03:45,006 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 3 (in 1 ms)  
18:03:50,007 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 4 @ 1479315830007  
18:03:50,007 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 4 (in 0 ms)  
18:03:55,006 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 5 @ 1479315835006  
18:03:55,007 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 5 (in 1 ms)  
18:04:00,007 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Triggering checkpoint 6 @ 1479315840007  
18:04:00,008 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
- Completed checkpoint 6 (in 1 ms)  
18:04:00,861 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  
- Subtask 0 has discovered a new shard  
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:  
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:  
340282366920938463463374607431768211455},SequenceNumberRange:  
{StartingSequenceNumber:  
49567694816512384728667706222664274486890894391754358786,}}'} due to  
resharding, and will start consuming the shard from sequence number  
EARLIEST_SEQUENCE_NUM with ShardConsumer 2  
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could  
not parse message"}  








--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-tp10133p10154.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  

Reply via email to