[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3078


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145784
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+
+/**
+ * Test for methods in the KinesisProxy class.
--- End diff --

Should link the `KinesisProxy` referencing, like other Javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145735
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}
 
+   /**
+* Determines whether the exception can be recovered from using
+* exponential-backoff
+* 
+* @param ex
+*Exception to inspect
--- End diff --

nit: I think the Javadoc formatting here is inconsistent with the other 
methods (line change).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145771
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+
+/**
+ * Test for methods in the KinesisProxy class.
+ * 
--- End diff --

Extra empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145754
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}
 
+   /**
+* Determines whether the exception can be recovered from using
+* exponential-backoff
+* 
+* @param ex
+*Exception to inspect
+* @return true if the exception can be recovered from, 
else
+* false
+*/
+   protected static boolean isRecoverableException(AmazonServiceException 
ex) {
+   if (ex.getErrorType() == null) {
+   return false;
+   }
+
+   switch (ex.getErrorType()) {
--- End diff --

The indentation for the cases here seem to be missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145761
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}
 
+   /**
+* Determines whether the exception can be recovered from using
+* exponential-backoff
+* 
+* @param ex
+*Exception to inspect
+* @return true if the exception can be recovered from, 
else
+* false
+*/
+   protected static boolean isRecoverableException(AmazonServiceException 
ex) {
+   if (ex.getErrorType() == null) {
+   return false;
+   }
+
+   switch (ex.getErrorType()) {
+   case Client:
+   if (ex instanceof 
ProvisionedThroughputExceededException) {
--- End diff --

It'll probably be cleaner to just do `ex instanceof 
ProvisionedThroughputExceededException `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145793
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.*;
--- End diff --

In Flink we generally try to avoid asterisk imports.
The style check doesn't actually check the test scope, but it'll be good to 
try to be consistent with the style rules in tests also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-06 Thread skidder
GitHub user skidder opened a pull request:

https://github.com/apache/flink/pull/3078

[FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming 
Connector

My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

```
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
It's interesting that the Kinesis endpoint returned a 500 status code, but 
that's outside the scope of this issue.

I think we can handle this exception in the same manner as a 
ProvisionedThroughputException: performing an exponential backoff and retrying 
a finite number of times before throwing an exception.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skidder/flink skidder/flink-5355

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3078.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3078


commit 85adf13f62351675e39811b9cb58aa2ac9a9cd4d
Author: Scott Kidder 
Date:   2016-12-16T16:46:54Z

[FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming 
Connector




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---