[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3078 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145784 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; + +/** + * Test for methods in the KinesisProxy class. --- End diff -- Should link the `KinesisProxy` referencing, like other Javadocs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145735 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** +* Determines whether the exception can be recovered from using +* exponential-backoff +* +* @param ex +*Exception to inspect --- End diff -- nit: I think the Javadoc formatting here is inconsistent with the other methods (line change). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145771 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; + +/** + * Test for methods in the KinesisProxy class. + * --- End diff -- Extra empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145754 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** +* Determines whether the exception can be recovered from using +* exponential-backoff +* +* @param ex +*Exception to inspect +* @return true if the exception can be recovered from, else +* false +*/ + protected static boolean isRecoverableException(AmazonServiceException ex) { + if (ex.getErrorType() == null) { + return false; + } + + switch (ex.getErrorType()) { --- End diff -- The indentation for the cases here seem to be missing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145761 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** +* Determines whether the exception can be recovered from using +* exponential-backoff +* +* @param ex +*Exception to inspect +* @return true if the exception can be recovered from, else +* false +*/ + protected static boolean isRecoverableException(AmazonServiceException ex) { + if (ex.getErrorType() == null) { + return false; + } + + switch (ex.getErrorType()) { + case Client: + if (ex instanceof ProvisionedThroughputExceededException) { --- End diff -- It'll probably be cleaner to just do `ex instanceof ProvisionedThroughputExceededException ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145793 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.*; --- End diff -- In Flink we generally try to avoid asterisk imports. The style check doesn't actually check the test scope, but it'll be good to try to be consistent with the style rules in tests also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
GitHub user skidder opened a pull request: https://github.com/apache/flink/pull/3078 [FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector My Flink job that consumes from a Kinesis stream must be restarted at least once daily due to an uncaught AmazonKinesisException when reading from Kinesis. The complete stacktrace looks like: ``` com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: dc1b7a1a-1b97-1a32-8cd5-79a896a55223) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` It's interesting that the Kinesis endpoint returned a 500 status code, but that's outside the scope of this issue. I think we can handle this exception in the same manner as a ProvisionedThroughputException: performing an exponential backoff and retrying a finite number of times before throwing an exception. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skidder/flink skidder/flink-5355 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3078.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3078 commit 85adf13f62351675e39811b9cb58aa2ac9a9cd4d Author: Scott KidderDate: 2016-12-16T16:46:54Z [FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---