[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305762#comment-15305762 ] Tzu-Li (Gordon) Tai commented on FLINK-3229: Great working with the community :) Thanks for merging! > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288916#comment-15288916 ] ASF GitHub Bot commented on FLINK-3229: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1911 > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272584#comment-15272584 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-217202359 Yep, that's the right branch. I tried working on different approaches, but its just an annoying problem with protobuf. I'll probably work on it tomorrow again. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272578#comment-15272578 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-217201356 Thanks Robert. I'll keep notice of your FLINK-3229-review branch for the changes (I'm assuming your working on FLINK-3229-review for the protobuf problem, please tell me if I'm wrong :)) Also, if there is anything I can do / help with (etc. tests on other environments) to further improve the PR, please don't hesitate to let me know :) > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268620#comment-15268620 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-216512837 I'm currently working on a custom branch based on this pull request. It seems that we are running into some dependency issues when using the kinesis-connector in AWS EMR. It seems that there is a clash with the protobuf versions (kinesis needs 2.6.x, but Flink has 2.5.0 in its classpath). I keep you posted > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262706#comment-15262706 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r61480982 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + //
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262033#comment-15262033 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-215402825 I'm currently busy with some other ongoing tasks. I hope to get back to this PR soon. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254633#comment-15254633 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213581650 Regarding the Jackson / dependency issue: You don't need to worry about it know. I'll take another look at the problem and make sure it'll work once we merge it. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254608#comment-15254608 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60799066 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + //
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254583#comment-15254583 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60797751 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + //
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254332#comment-15254332 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213530765 Hi, Cool, if you have time to adress them, go ahead :) Thanks a lot for doing this by the way! I really like the work you did so far on the connector! Sent from my iPhone > On 22.04.2016, at 18:01, Tzu-Li Taiwrote: > > @rmetzger > Thank you very much for your detailed review on the PR :) > I've replied to the comments you added, please . > I can address the issues and follow up with corresponding commits within the next 36 hours. I am pretty much free for the next 3 days, and will very much like to get the consumer ready for merging by the end of this week :) > > If it still isn't ready by the end of 4/25, I'm afraid I will have to leave any remaining issues for you to address since after then I temporarily won't be able to work on code until June. > > — > You are receiving this because you were mentioned. > Reply to this email directly or view it on GitHub > > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254221#comment-15254221 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60770825 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java --- @@ -0,0 +1,51 @@ +package org.apache.flink.streaming.connectors.kinesis.config; + +/** + * + */ +public class KinesisConfigConstants { + + // + // Configuration Keys + // + + /** The max retries to retrieve metadata from a Kinesis stream using describeStream API +* (Note: describeStream attempts may be temporarily blocked due to AWS capping 5 attempts per sec) */ + public static final String CONFIG_STREAM_DESCRIBE_RETRIES = "flink.stream.describe.retry"; + + /** The backoff time between each describeStream attempt */ + public static final String CONFIG_STREAM_DESCRIBE_BACKOFF = "flink.stream.describe.backoff"; + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String CONFIG_STREAM_INIT_POSITION_TYPE = "flink.stream.initpos.type"; + + /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE = "aws.credentials.provider"; + + /** The AWS access key ID to use when setting credentials provider type to BASIC */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID = "aws.credentials.provider.basic.accesskeyid"; + + /** The AWS secret key to use when setting credentials provider type to BASIC */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY = "aws.credentials.provider.basic.secretkey"; + + /** Optional configuration for profile path if credential provider type is set to be PROFILE */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH = "aws.credentials.provider.profile.path"; + + /** Optional configuration for profile name if credential provider type is set to be PROFILE */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME = "aws.credentials.provider.profile.name"; + + /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */ + public static final String CONFIG_AWS_REGION = "aws.region"; + + + // + // Default configuration values + // + + public static final String DEFAULT_AWS_REGION = "us-east-1"; --- End diff -- I think its reasonable to make region a required argument. As a user, more than once I've found myself mistakened for the AWS SDK not correctly finding resources, only realizing that it is defaulting to another region unless specified. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254132#comment-15254132 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213491263 @rmetzger Thank you very much for your detailed review on the PR :) I've replied to the comments you added, please . I can address the issues and follow up with corresponding commits within the next 36 hours. I am pretty much free for the next 3 days, and will very much like to get the consumer ready for merging by the end of this week :) If it still isn't ready by the end of 4/25, I'm afraid I will have to leave any remaining issues for you to address since after then I temporarily won't be able to work on code until June. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254110#comment-15254110 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60760294 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMapassignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254107#comment-15254107 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60760085 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + //
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254106#comment-15254106 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60759925 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information + * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to + * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges. + */ +public class KinesisStreamShard implements Serializable { --- End diff -- The main reason for why the consumer needs to have another Shard representation is because `com.amazonaws.services.kinesis.model.Shard` doesn't have the shard's associated stream name as a field. We will need the stream name when getting a shard iterator for a particular shard, using `com.amazonaws.services.kinesis.AmazonKinesisClient#getShardIterator(streamName, shardId, iteratorType)`. Moreover, since the consumer's implementation supports reading from multiple Kinesis streams, we must carry the associated stream name along with each Shard representation (I guess that's the reason why Amazon's Shard implementation doesn't have a field for stream name). Our implementation, `org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard`, currently has `regionName` (I don't think we need this actually, since the consumer is limited to read from Kinesis streams within the same region) and `streamName` as fields besides the already supplied ones in Amazon's Shard. So, what we could do to reduce duplicate implementation is to include Amazon's Shard implementation as a field within our `KinesisStreamShard`, and let the `KinesisStreamShard` still have `streamName` as an extra field. How do you think? > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254090#comment-15254090 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60756732 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + //
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254080#comment-15254080 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60755162 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + //
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254068#comment-15254068 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60753879 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** +* Create a new KinesisProxy based on the supplied configuration properties +* +* @param configProps configuration properties containing AWS credential and AWS region info +*/ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); + + this.kinesisClient = client; + } + + /** +* Get the next batch of data records using a specific shard iterator +* +* @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading +* @param maxRecordsToGet the maximum amount of records to retrieve for this batch +* @return the batch of retrieved records +*/ + public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { + final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); + getRecordsRequest.setShardIterator(shardIterator); +
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254063#comment-15254063 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213465308 @rmetzger Regarding the need to upgrade Jackson to get the example working: After updating the code for Flink 1.0.x to prepare for PR, I only tested the consumer with manual tests within the Flink project code. However, when the consumer was first implemented when Flink was 0.10.1, at the time I did package the consumer and used it as a separate dependency. I'm wondering would it have anything to do with changes between older & newer Flink version? > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254055#comment-15254055 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60752259 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMapassignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253899#comment-15253899 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213425285 Overall, I really like the work you did for the Kinesis Consumer! As you can see I've added some comments on the PR. Please let me know what's you opinion on my comments. When you do you think you'll have time to address the issues? I would like to get the code merged as soon as possible because Amazon is asking for having it in Flink soon. If you know already that you won't have time for working on this in the upcoming days, I can also address the comments. Just let me know, I think we can find a solution. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253871#comment-15253871 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60733759 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + //
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253870#comment-15253870 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60733595 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { --- End diff -- I don't think we need to implement the `CheckpointListener` currently because we are not committing the offsets anywhere. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics >
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253868#comment-15253868 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60733038 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMapassignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253857#comment-15253857 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60732384 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information + * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to + * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges. + */ +public class KinesisStreamShard implements Serializable { --- End diff -- + with my suggestion to move the shard list out of the constructor, we don't rely on serializability anymore. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253856#comment-15253856 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60732321 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.model; + +import java.io.Serializable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information + * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to + * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges. + */ +public class KinesisStreamShard implements Serializable { --- End diff -- The SDK's `com.amazonaws.services.kinesis.model.Shard` is also `Serializable`. I wonder if we really need to create a copy of their Shard within Flink. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253852#comment-15253852 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60732133 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + //
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253843#comment-15253843 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-213407614 I could not get the example to work with the current jackson version. Only after upgrading it to `2.7.3` it was working. Did you test the kinesis consumer using a separate project (adding the flink-kinesis-consumer as a dependency) ? > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253838#comment-15253838 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60731184 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + //
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253839#comment-15253839 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60731207 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java --- @@ -0,0 +1,51 @@ +package org.apache.flink.streaming.connectors.kinesis.config; + +/** + * + */ +public class KinesisConfigConstants { + + // + // Configuration Keys + // + + /** The max retries to retrieve metadata from a Kinesis stream using describeStream API +* (Note: describeStream attempts may be temporarily blocked due to AWS capping 5 attempts per sec) */ + public static final String CONFIG_STREAM_DESCRIBE_RETRIES = "flink.stream.describe.retry"; + + /** The backoff time between each describeStream attempt */ + public static final String CONFIG_STREAM_DESCRIBE_BACKOFF = "flink.stream.describe.backoff"; + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String CONFIG_STREAM_INIT_POSITION_TYPE = "flink.stream.initpos.type"; + + /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE = "aws.credentials.provider"; + + /** The AWS access key ID to use when setting credentials provider type to BASIC */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID = "aws.credentials.provider.basic.accesskeyid"; + + /** The AWS secret key to use when setting credentials provider type to BASIC */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY = "aws.credentials.provider.basic.secretkey"; + + /** Optional configuration for profile path if credential provider type is set to be PROFILE */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_PATH = "aws.credentials.provider.profile.path"; + + /** Optional configuration for profile name if credential provider type is set to be PROFILE */ + public static final String CONFIG_AWS_CREDENTIALS_PROVIDER_PROFILE_NAME = "aws.credentials.provider.profile.name"; + + /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */ + public static final String CONFIG_AWS_REGION = "aws.region"; + + + // + // Default configuration values + // + + public static final String DEFAULT_AWS_REGION = "us-east-1"; --- End diff -- If we make the region a required argument, we won't need this anymore. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253836#comment-15253836 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60731025 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** +* Create a new KinesisProxy based on the supplied configuration properties +* +* @param configProps configuration properties containing AWS credential and AWS region info +*/ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); + + this.kinesisClient = client; + } + + /** +* Get the next batch of data records using a specific shard iterator +* +* @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading +* @param maxRecordsToGet the maximum amount of records to retrieve for this batch +* @return the batch of retrieved records +*/ + public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { + final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); + getRecordsRequest.setShardIterator(shardIterator); +
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253835#comment-15253835 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60730891 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java --- @@ -0,0 +1,51 @@ +package org.apache.flink.streaming.connectors.kinesis.config; --- End diff -- The maven build is failing because this file is missing a license header > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253831#comment-15253831 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60730773 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMapassignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** +* Creates a new Kinesis Data Fetcher for the specified set of shards +* +* @param assignedShards the shards that this fetcher will pull data from +* @param configProps the configuration properties of this Flink Kinesis Consumer +* @param taskName the task name of this consumer task +*/ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** +* Advance a
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253779#comment-15253779 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60726049 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** +* Create a new KinesisProxy based on the supplied configuration properties +* +* @param configProps configuration properties containing AWS credential and AWS region info +*/ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); --- End diff -- I found out what I was doing wrong. The code was using the default region ID because I forgot to set it. I'm currently fixing some issues in the consumer and I'll make the region a required argument. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15251355#comment-15251355 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60531075 --- Diff: flink-streaming-connectors/pom.xml --- @@ -45,6 +45,7 @@ under the License. flink-connector-rabbitmq flink-connector-twitter flink-connector-nifi + flink-connector-kinesis --- End diff -- Thanks, I missed the "include-kinesis" profile defined below. We'll probably need a more general profile name in the future though (ex. include-aws-connectors), for example when we start including more Amazon licensed libraries for other connectors such as for DynamoDB > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250214#comment-15250214 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60443897 --- Diff: flink-streaming-connectors/pom.xml --- @@ -45,6 +45,7 @@ under the License. flink-connector-rabbitmq flink-connector-twitter flink-connector-nifi + flink-connector-kinesis --- End diff -- I think we have to remove this line again. The module is included in the profile below (you have to activate the "include-kinesis" maven build profile) > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250211#comment-15250211 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-212504465 Great, thank you. I'll review the PR soon. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249627#comment-15249627 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-212376531 Quick update: 1. Rebased and integrated the consumer code into the maven module that came with the producer merge. 2. Appended documentation for the consumer. 3. Moved the producer's KinesisSerializationSchema into org.apache.flink.streaming.connectors.kinesis.serialization package, where I originally placed deserialization related classes for the consumer. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249537#comment-15249537 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-212341465 The problem was that the github mirror needed some time to sync with the commit. But now its there. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249532#comment-15249532 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-212340127 ah ok :) see it now, thanks. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249503#comment-15249503 ] ASF GitHub Bot commented on FLINK-3229: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-212333629 @tzulitai The producer wasn't merged yet. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249493#comment-15249493 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-212330070 @rmetzger Hi Robert, I'm rebasing my PR, but I could not find the merged Kinesis producer / maven module in the current apache/flink master. Please correct me if I'm doing anything wrong. Thanks :) > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249454#comment-15249454 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-212310743 Quick update on our plan: I've merged the Kinesis producer. If you want, you can rebase this pull request on the current master. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249452#comment-15249452 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60364261 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** +* Create a new KinesisProxy based on the supplied configuration properties +* +* @param configProps configuration properties containing AWS credential and AWS region info +*/ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); --- End diff -- I thought so too, but it didn't work for me. I'll investigate the issue further ... > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249208#comment-15249208 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60346774 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** +* Create a new KinesisProxy based on the supplied configuration properties +* +* @param configProps configuration properties containing AWS credential and AWS region info +*/ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); --- End diff -- I'm using the "ap-northeast-1" region, which isn't the default. Setting the region on the AmazonKinesisClient should set the endpoint too, no? > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247784#comment-15247784 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60235061 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A utility class that is used as a proxy to make calls to AWS Kinesis + * for several functions, such as getting a list of shards and fetching + * a batch of data records starting from a specified record sequence number. + */ +public class KinesisProxy { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); + + /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ + private final AmazonKinesisClient kinesisClient; + + /** The AWS region that this proxy will be making calls to */ + private final String regionId; + + /** Configuration properties of this Flink Kinesis Connector */ + private final Properties configProps; + + /** +* Create a new KinesisProxy based on the supplied configuration properties +* +* @param configProps configuration properties containing AWS credential and AWS region info +*/ + public KinesisProxy(Properties configProps) { + this.configProps = checkNotNull(configProps); + + this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); + client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); --- End diff -- I had to set the endpoint here as well to make it use. Which AWS region were you using? > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247512#comment-15247512 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-211847420 Cool. You don't need to resubmit a new PR. By pushing new commits to your `FLINK-3229` branch, the pull request will update automatically. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247509#comment-15247509 ] ASF GitHub Bot commented on FLINK-3229: --- Github user tzulitai commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-211847140 @rmetzger Sure, that seems reasonable. I'll wait until the producer is merged and resubmit a new PR for the integrated consumer. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247501#comment-15247501 ] ASF GitHub Bot commented on FLINK-3229: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1911#issuecomment-211846349 Thank you for opening a pull request for the consumer. How about we proceed like this: - I'm trying to get the producer code merged within the next 24 hours (feel free to test it a bit if you want) - In the meantime, I'm testing and reviewing your code - Once the producer has been merged, we integrate the consumer code into the maven module / code structure from my producer code. - I'll review the consumer again and we merge it ;) > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247492#comment-15247492 ] ASF GitHub Bot commented on FLINK-3229: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/1911 [FLINK-3229] Flink streaming consumer for AWS Kinesis I've been using this consumer for a while in off-production environments. I understand we should have good test coverage for each PR, but since Kinesis is a hosted service, reliable integration tests are hard to pull off. To speed up merging Kinesis connector for the next release, I'm submitting the consumer now for some early reviews. On the other hand, since @rmetzger is submitting a separate PR for Kinesis producer, I'd like to postpone writing more tests for the consumer, as well as corresponding modification to the document until both the consumer and producer are in place. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-3229 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1911.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1911 commit 0e7c4eccca132e0fcf35262b36229e681c77530e Author: Gordon TaiDate: 2016-04-16T03:33:17Z [FLINK-3229] Initial working version for FlinkKinesisConsumer. commit 6d46a6d09c346e490053a2f6319616a5178dab4e Author: Gordon Tai Date: 2016-04-17T09:34:55Z [FLINK-3229] Change access level of `assignShards` and `validatePropertiesConfig` to protected for testing purposes commit c92b491fce8b3c35b409bc6f308d25ce52835027 Author: Gordon Tai Date: 2016-04-17T11:12:00Z [FLINK-3229] Fix coding stype violations regarding leading spaces in indentations commit bc9f771c0f36618ce07772cfdefe7d87a35800fb Author: Gordon Tai Date: 2016-04-19T09:56:57Z [FLINK-3229] Change scope of flink-streaming-java module to provided commit fc454efaac5dc7969ad4834b892f26799cfe5a33 Author: Gordon Tai Date: 2016-04-19T09:58:46Z [FLINK-3229] Basic unit test for stable shard-to-consumer assignment > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties kinesisConfig = new Properties(); > config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1"); > config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, > "BASIC"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, > "aws_access_key_id_here"); > config.put( > KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY, > "aws_secret_key_here"); > config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, > "LATEST"); // or TRIM_HORIZON > DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", > new SimpleStringSchema(), > kinesisConfig)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15243991#comment-15243991 ] Tzu-Li (Gordon) Tai commented on FLINK-3229: (Duplicate comment from FLINK-3211. Posting it here also to keep the issue updated.) https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis Here is the initial working version of FlinkKinesisConsumer that I have been testing in off-production environments, updated corresponding to the recent Flink 1.0 changes. I'm still refactoring the code just a bit for easier unit tests. The PR will be very soon. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties config = new Properties(); > config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_RETRIES, "3"); > config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_BACKOFF_MILLIS, > "1000"); > config.put(FlinkKinesisConsumer.CONFIG_STREAM_START_POSITION_TYPE, "latest"); > config.put(FlinkKinesisConsumer.CONFIG_AWS_REGION, "us-east-1"); > AWSCredentialsProvider credentials = // credentials API in AWS SDK > DataStream kinesisRecords = env > .addSource(new FlinkKinesisConsumer<>( > listOfStreams, credentials, new SimpleStringSchema(), config > )); > {code} > Currently still considering which read start positions to support > ("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER"). The discussions for this > can be found in https://issues.apache.org/jira/browser/FLINK-3211. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15101887#comment-15101887 ] Stephan Ewen commented on FLINK-3229: - As per discussion in FLINK-3211 , I think "TRIM_HORIZON" and "LATEST" are sufficient for now. > Kinesis streaming consumer with integration of Flink's checkpointing mechanics > -- > > Key: FLINK-3229 > URL: https://issues.apache.org/jira/browse/FLINK-3229 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Opening a sub-task to implement data source consumer for Kinesis streaming > connector (https://issues.apache.org/jira/browser/FLINK-3211). > An example of the planned user API for Flink Kinesis Consumer: > {code} > Properties config = new Properties(); > config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_RETRIES, "3"); > config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_BACKOFF_MILLIS, > "1000"); > config.put(FlinkKinesisConsumer.CONFIG_STREAM_START_POSITION_TYPE, "latest"); > config.put(FlinkKinesisConsumer.CONFIG_AWS_REGION, "us-east-1"); > AWSCredentialsProvider credentials = // credentials API in AWS SDK > DataStream kinesisRecords = env > .addSource(new FlinkKinesisConsumer<>( > listOfStreams, credentials, new SimpleStringSchema(), config > )); > {code} > Currently still considering which read start positions to support > ("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER"). The discussions for this > can be found in https://issues.apache.org/jira/browser/FLINK-3211. -- This message was sent by Atlassian JIRA (v6.3.4#6332)