[GitHub] [flink-connector-aws] darenwkt commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-04-25 Thread via GitHub


darenwkt commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1176766870


##
flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/examples/SourceFromKinesis.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.examples;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.kinesis.source.KinesisStreamsSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+
+import java.util.Properties;
+
+/**
+ * An example application demonstrating how to use the {@link 
KinesisStreamsSource} to read from
+ * KDS.
+ */
+public class SourceFromKinesis {
+
+public static void main(String[] args) throws Exception {
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(10_000);
+
+Properties consumerConfig = new Properties();
+consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");

Review Comment:
   Question: Do we support `AWS_ENDPOINT` instead of `AWS_REGION` in FLIP-27?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] darenwkt commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-04-25 Thread via GitHub


darenwkt commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1176764441


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java:
##


Review Comment:
   We should consider adding Test for this class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] darenwkt commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-04-25 Thread via GitHub


darenwkt commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1176498371


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigConstants.java:
##
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+
+import java.time.Duration;
+
+@PublicEvolving
+public class SourceConfigConstants extends AWSConfigConstants {
+public enum InitialPosition {
+LATEST,
+TRIM_HORIZON,
+AT_TIMESTAMP
+}
+
+/** The record publisher type represents the record-consume style. */
+public enum RecordPublisherType {
+
+/** Consume the Kinesis records using AWS SDK v2 with the enhanced 
fan-out consumer. */
+EFO,
+/** Consume the Kinesis records using AWS SDK v1 with the get-records 
method. */
+POLLING
+}
+
+/** The EFO registration type represents how we are going to de-/register 
efo consumer. */
+public enum EFORegistrationType {
+
+/**
+ * Delay the registration of efo consumer for taskmanager to execute. 
De-register the efo
+ * consumer for taskmanager to execute when task is shut down.
+ */
+LAZY,
+/**
+ * Register the efo consumer eagerly for jobmanager to execute. 
De-register the efo consumer
+ * the same way as lazy does.
+ */
+EAGER,
+/** Do not register efo consumer programmatically. Do not de-register 
either. */
+NONE
+}
+
+/** The RecordPublisher type (EFO|POLLING). */
+public static final String RECORD_PUBLISHER_TYPE = 
"flink.stream.recordpublisher";
+
+public static final String DEFAULT_RECORD_PUBLISHER_TYPE =
+RecordPublisherType.POLLING.toString();
+
+/** Determine how and when consumer de-/registration is performed 
(LAZY|EAGER|NONE). */
+public static final String EFO_REGISTRATION_TYPE = 
"flink.stream.efo.registration";
+
+public static final String DEFAULT_EFO_REGISTRATION_TYPE = 
EFORegistrationType.EAGER.toString();
+
+/** The name of the EFO consumer to register with KDS. */
+public static final String EFO_CONSUMER_NAME = 
"flink.stream.efo.consumername";
+
+/** The prefix of consumer ARN for a given stream. */
+public static final String EFO_CONSUMER_ARN_PREFIX = 
"flink.stream.efo.consumerarn";
+
+/** The initial position to start reading Kinesis streams from. */
+public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
+
+public static final String DEFAULT_STREAM_INITIAL_POSITION = 
InitialPosition.LATEST.toString();
+
+/**
+ * The initial timestamp to start reading Kinesis stream from (when 
AT_TIMESTAMP is set for
+ * STREAM_INITIAL_POSITION).
+ */
+public static final String STREAM_INITIAL_TIMESTAMP = 
"flink.stream.initpos.timestamp";
+
+/**
+ * The date format of initial timestamp to start reading Kinesis stream 
from (when AT_TIMESTAMP
+ * is set for STREAM_INITIAL_POSITION).
+ */
+public static final String STREAM_TIMESTAMP_DATE_FORMAT =
+"flink.stream.initpos.timestamp.format";
+
+public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT =
+"-MM-dd'T'HH:mm:ss.SSSXXX";
+
+/** The maximum number of describeStream attempts if we get a recoverable 
exception. */
+public static final String STREAM_DESCRIBE_RETRIES = 
"flink.stream.describe.maxretries";
+
+public static final int DEFAULT_STREAM_DESCRIBE_RETRIES = 50;
+
+/** The base backoff time between each describeStream attempt. */
+public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
+
+public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 2000L;
+
+/** The maximum backoff time between each describeStream attempt. */
+public static final String 

[GitHub] [flink-connector-aws] darenwkt commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-04-25 Thread via GitHub


darenwkt commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1176497702


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigConstants.java:
##
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+
+import java.time.Duration;
+
+@PublicEvolving
+public class SourceConfigConstants extends AWSConfigConstants {
+public enum InitialPosition {
+LATEST,
+TRIM_HORIZON,
+AT_TIMESTAMP
+}
+
+/** The record publisher type represents the record-consume style. */
+public enum RecordPublisherType {
+
+/** Consume the Kinesis records using AWS SDK v2 with the enhanced 
fan-out consumer. */
+EFO,
+/** Consume the Kinesis records using AWS SDK v1 with the get-records 
method. */
+POLLING
+}
+
+/** The EFO registration type represents how we are going to de-/register 
efo consumer. */
+public enum EFORegistrationType {
+
+/**
+ * Delay the registration of efo consumer for taskmanager to execute. 
De-register the efo
+ * consumer for taskmanager to execute when task is shut down.
+ */
+LAZY,
+/**
+ * Register the efo consumer eagerly for jobmanager to execute. 
De-register the efo consumer
+ * the same way as lazy does.
+ */
+EAGER,
+/** Do not register efo consumer programmatically. Do not de-register 
either. */
+NONE
+}
+
+/** The RecordPublisher type (EFO|POLLING). */
+public static final String RECORD_PUBLISHER_TYPE = 
"flink.stream.recordpublisher";
+
+public static final String DEFAULT_RECORD_PUBLISHER_TYPE =
+RecordPublisherType.POLLING.toString();
+
+/** Determine how and when consumer de-/registration is performed 
(LAZY|EAGER|NONE). */
+public static final String EFO_REGISTRATION_TYPE = 
"flink.stream.efo.registration";
+
+public static final String DEFAULT_EFO_REGISTRATION_TYPE = 
EFORegistrationType.EAGER.toString();
+
+/** The name of the EFO consumer to register with KDS. */
+public static final String EFO_CONSUMER_NAME = 
"flink.stream.efo.consumername";
+
+/** The prefix of consumer ARN for a given stream. */
+public static final String EFO_CONSUMER_ARN_PREFIX = 
"flink.stream.efo.consumerarn";
+
+/** The initial position to start reading Kinesis streams from. */
+public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
+
+public static final String DEFAULT_STREAM_INITIAL_POSITION = 
InitialPosition.LATEST.toString();
+
+/**
+ * The initial timestamp to start reading Kinesis stream from (when 
AT_TIMESTAMP is set for
+ * STREAM_INITIAL_POSITION).
+ */
+public static final String STREAM_INITIAL_TIMESTAMP = 
"flink.stream.initpos.timestamp";
+
+/**
+ * The date format of initial timestamp to start reading Kinesis stream 
from (when AT_TIMESTAMP
+ * is set for STREAM_INITIAL_POSITION).
+ */
+public static final String STREAM_TIMESTAMP_DATE_FORMAT =
+"flink.stream.initpos.timestamp.format";
+
+public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT =
+"-MM-dd'T'HH:mm:ss.SSSXXX";
+
+/** The maximum number of describeStream attempts if we get a recoverable 
exception. */
+public static final String STREAM_DESCRIBE_RETRIES = 
"flink.stream.describe.maxretries";
+
+public static final int DEFAULT_STREAM_DESCRIBE_RETRIES = 50;
+
+/** The base backoff time between each describeStream attempt. */
+public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
+
+public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 2000L;
+
+/** The maximum backoff time between each describeStream attempt. */
+public static final String