[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5021


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151188168
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
@@ -133,12 +131,11 @@ private void executeActionAsync(
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||
--- End diff --

The problem was the, now removed, `UnknownKvStateLocation`. This is the one 
returned by the JobManager. Then, based on that, you go to the appropriate 
TaskManager.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151187261
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
@@ -133,12 +131,11 @@ private void executeActionAsync(
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||
--- End diff --

Then i don't understand how this can fix the issue.

In case of an `UnknownKvStateIdException` the client still retries fetching 
the state.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151184658
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -1372,84 +1492,60 @@ public String fold(String accumulator, 
Tuple2 value) throws Excep
 
/   General Utility Methods 
//
 
-   private static  CompletableFuture 
getKvStateWithRetries(
+   private static  CompletableFuture getKvState(
final QueryableStateClient client,
final JobID jobId,
final String queryName,
final K key,
final TypeInformation keyTypeInfo,
final StateDescriptor stateDescriptor,
-   final Time retryDelay,
final boolean failForUnknownKeyOrNamespace,
-   final ScheduledExecutor executor) {
-   return retryWithDelay(
-   () -> client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor),
-   NO_OF_RETRIES,
-   retryDelay,
-   executor,
-   failForUnknownKeyOrNamespace);
-   }
-
-   private static  CompletableFuture retryWithDelay(
-   final Supplier operation,
-   final int retries,
-   final Time retryDelay,
-   final ScheduledExecutor scheduledExecutor,
-   final boolean failIfUnknownKeyOrNamespace) {
-
-   final CompletableFuture resultFuture = new 
CompletableFuture<>();
-
-   retryWithDelay(
-   resultFuture,
-   operation,
-   retries,
-   retryDelay,
-   scheduledExecutor,
-   failIfUnknownKeyOrNamespace);
+   final ScheduledExecutor executor) throws 
InterruptedException {
 
+   final CompletableFuture resultFuture = new 
CompletableFuture<>();
+   getKvStateIgnoringCertainExceptions(
+   resultFuture, client, jobId, queryName, key, 
keyTypeInfo,
+   stateDescriptor, failForUnknownKeyOrNamespace, 
executor);
return resultFuture;
}
 
-   public static  void retryWithDelay(
-   final CompletableFuture resultFuture,
-   final Supplier operation,
-   final int retries,
-   final Time retryDelay,
-   final ScheduledExecutor scheduledExecutor,
-   final boolean failIfUnknownKeyOrNamespace) {
+   private static  void 
getKvStateIgnoringCertainExceptions(
+   final CompletableFuture resultFuture,
+   final QueryableStateClient client,
+   final JobID jobId,
+   final String queryName,
+   final K key,
+   final TypeInformation keyTypeInfo,
+   final StateDescriptor stateDescriptor,
+   final boolean failForUnknownKeyOrNamespace,
+   final ScheduledExecutor executor) throws 
InterruptedException {
 
if (!resultFuture.isDone()) {
-   final CompletableFuture operationResultFuture = 
operation.get();
-   operationResultFuture.whenCompleteAsync(
-   (t, throwable) -> {
-   if (throwable != null) {
-   if 
(throwable.getCause() instanceof CancellationException) {
-   
resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation 
future was cancelled.", throwable.getCause()));
-   } else if 
(throwable.getCause() instanceof AssertionError ||
-   
(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof 
UnknownKeyOrNamespaceException)) {
-   
resultFuture.completeExceptionally(throwable.getCause());
-   } else {
- 

[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151184079
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -917,20 +1023,24 @@ public void processElement(Tuple2 
value, Context ctx, Collector

[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151184645
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -439,6 +443,85 @@ public Integer getKey(Tuple2 value) 
throws Exception {
}
 
/**
+* Tests that the correct exception is thrown if the query
+* contains a wrong queryable state name.
+*/
+   @Test
+   public void testWrongQueryableStateName() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final long numElements = 1024L;
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStateBackend(stateBackend);
+   env.setParallelism(maxParallelism);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+   // submitting.
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+   DataStream> source = env
+   .addSource(new 
TestAscendingValueSource(numElements));
+
+   // Value state
+   ValueStateDescriptor> valueState =
+   new ValueStateDescriptor<>("any", 
source.getType());
+
+   source.keyBy(new KeySelector, 
Integer>() {
+   private static final long serialVersionUID = 
7662520075515707428L;
+
+   @Override
+   public Integer getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   }).asQueryableState("hakuna", valueState);
+
+   // Submit the job graph
+   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+   jobId = jobGraph.getJobID();
+
+   cluster.submitJobDetached(jobGraph);
+
+   // wait until the job is running before starting to 
query.
+   
FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
+   .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
+   
.mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+   CompletableFuture>> 
future = client.getKvState(
+   jobId,
+   "wrong-hankuna", // this is the wrong 
name.
+   0,
+   VoidNamespace.INSTANCE,
+   BasicTypeInfo.INT_TYPE_INFO,
+   VoidNamespaceTypeInfo.INSTANCE,
+   valueState);
+
+   final CompletableFuture completion = new 
CompletableFuture<>();
+   future.whenComplete((result, throwable) -> {
--- End diff --

Yes, you are right.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151184384
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
@@ -133,12 +131,11 @@ private void executeActionAsync(
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||
--- End diff --

The `UnknownKvStateLocationException` is thrown in the case we do not have 
a server for a particular `keyGroup`.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151183892
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -439,6 +443,85 @@ public Integer getKey(Tuple2 value) 
throws Exception {
}
 
/**
+* Tests that the correct exception is thrown if the query
+* contains a wrong queryable state name.
+*/
+   @Test
+   public void testWrongQueryableStateName() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final long numElements = 1024L;
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStateBackend(stateBackend);
+   env.setParallelism(maxParallelism);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+   // submitting.
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+   DataStream> source = env
+   .addSource(new 
TestAscendingValueSource(numElements));
+
+   // Value state
+   ValueStateDescriptor> valueState =
+   new ValueStateDescriptor<>("any", 
source.getType());
+
+   source.keyBy(new KeySelector, 
Integer>() {
+   private static final long serialVersionUID = 
7662520075515707428L;
+
+   @Override
+   public Integer getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   }).asQueryableState("hakuna", valueState);
+
+   // Submit the job graph
+   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+   jobId = jobGraph.getJobID();
+
+   cluster.submitJobDetached(jobGraph);
+
+   // wait until the job is running before starting to 
query.
+   
FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
+   .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
+   
.mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+   CompletableFuture>> 
future = client.getKvState(
+   jobId,
+   "wrong-hankuna", // this is the wrong 
name.
+   0,
+   VoidNamespace.INSTANCE,
+   BasicTypeInfo.INT_TYPE_INFO,
+   VoidNamespaceTypeInfo.INSTANCE,
+   valueState);
+
+   final CompletableFuture completion = new 
CompletableFuture<>();
+   future.whenComplete((result, throwable) -> {
+   Assert.assertTrue(throwable != null);
+   
Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation"));
+   completion.complete(null);
+   });
+
+   completion.join();
--- End diff --

I also remember till suggesting to use `get()` instead


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151183957
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -439,6 +443,85 @@ public Integer getKey(Tuple2 value) 
throws Exception {
}
 
/**
+* Tests that the correct exception is thrown if the query
+* contains a wrong queryable state name.
+*/
+   @Test
+   public void testWrongQueryableStateName() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final long numElements = 1024L;
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStateBackend(stateBackend);
+   env.setParallelism(maxParallelism);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+   // submitting.
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+   DataStream> source = env
+   .addSource(new 
TestAscendingValueSource(numElements));
+
+   // Value state
+   ValueStateDescriptor> valueState =
+   new ValueStateDescriptor<>("any", 
source.getType());
+
+   source.keyBy(new KeySelector, 
Integer>() {
+   private static final long serialVersionUID = 
7662520075515707428L;
+
+   @Override
+   public Integer getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   }).asQueryableState("hakuna", valueState);
+
+   // Submit the job graph
+   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+   jobId = jobGraph.getJobID();
+
+   cluster.submitJobDetached(jobGraph);
+
+   // wait until the job is running before starting to 
query.
+   
FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
+   .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
+   
.mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+   CompletableFuture>> 
future = client.getKvState(
+   jobId,
+   "wrong-hankuna", // this is the wrong 
name.
+   0,
+   VoidNamespace.INSTANCE,
+   BasicTypeInfo.INT_TYPE_INFO,
+   VoidNamespaceTypeInfo.INSTANCE,
+   valueState);
+
+   final CompletableFuture completion = new 
CompletableFuture<>();
+   future.whenComplete((result, throwable) -> {
+   Assert.assertTrue(throwable != null);
+   
Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation"));
--- End diff --

No because it is already "string-ified".


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151183730
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -439,6 +443,85 @@ public Integer getKey(Tuple2 value) 
throws Exception {
}
 
/**
+* Tests that the correct exception is thrown if the query
+* contains a wrong queryable state name.
+*/
+   @Test
+   public void testWrongQueryableStateName() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final long numElements = 1024L;
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStateBackend(stateBackend);
+   env.setParallelism(maxParallelism);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+   // submitting.
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+   DataStream> source = env
+   .addSource(new 
TestAscendingValueSource(numElements));
+
+   // Value state
+   ValueStateDescriptor> valueState =
+   new ValueStateDescriptor<>("any", 
source.getType());
+
+   source.keyBy(new KeySelector, 
Integer>() {
+   private static final long serialVersionUID = 
7662520075515707428L;
+
+   @Override
+   public Integer getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   }).asQueryableState("hakuna", valueState);
+
+   // Submit the job graph
+   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+   jobId = jobGraph.getJobID();
+
+   cluster.submitJobDetached(jobGraph);
+
+   // wait until the job is running before starting to 
query.
+   
FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
+   .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
+   
.mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+   CompletableFuture>> 
future = client.getKvState(
+   jobId,
+   "wrong-hankuna", // this is the wrong 
name.
+   0,
+   VoidNamespace.INSTANCE,
+   BasicTypeInfo.INT_TYPE_INFO,
+   VoidNamespaceTypeInfo.INSTANCE,
+   valueState);
+
+   final CompletableFuture completion = new 
CompletableFuture<>();
+   future.whenComplete((result, throwable) -> {
--- End diff --

I don't think we need the `completion` future here. we could return null in 
the future in case of success, throw an exception in case of failure, and call 
get on the result.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151183695
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
@@ -133,12 +131,11 @@ private void executeActionAsync(
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||
--- End diff --

No, an `UnknownKvStateIdException` is thrown in this case. See 
`KvStateServerHandler`.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151183131
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -439,6 +443,85 @@ public Integer getKey(Tuple2 value) 
throws Exception {
}
 
/**
+* Tests that the correct exception is thrown if the query
+* contains a wrong queryable state name.
+*/
+   @Test
+   public void testWrongQueryableStateName() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final long numElements = 1024L;
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStateBackend(stateBackend);
+   env.setParallelism(maxParallelism);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+   // submitting.
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+   DataStream> source = env
+   .addSource(new 
TestAscendingValueSource(numElements));
+
+   // Value state
+   ValueStateDescriptor> valueState =
+   new ValueStateDescriptor<>("any", 
source.getType());
+
+   source.keyBy(new KeySelector, 
Integer>() {
+   private static final long serialVersionUID = 
7662520075515707428L;
+
+   @Override
+   public Integer getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   }).asQueryableState("hakuna", valueState);
+
+   // Submit the job graph
+   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+   jobId = jobGraph.getJobID();
+
+   cluster.submitJobDetached(jobGraph);
+
+   // wait until the job is running before starting to 
query.
+   
FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
+   .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
+   
.mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+   CompletableFuture>> 
future = client.getKvState(
+   jobId,
+   "wrong-hankuna", // this is the wrong 
name.
+   0,
+   VoidNamespace.INSTANCE,
+   BasicTypeInfo.INT_TYPE_INFO,
+   VoidNamespaceTypeInfo.INSTANCE,
+   valueState);
+
+   final CompletableFuture completion = new 
CompletableFuture<>();
+   future.whenComplete((result, throwable) -> {
+   Assert.assertTrue(throwable != null);
+   
Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation"));
--- End diff --

can you not check the class of the exception?


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151182879
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
@@ -133,12 +131,11 @@ private void executeActionAsync(
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||
--- End diff --

Then, in which cases is an UnknownKvStateIdException thrown? That actually 
sounds more appropriate.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151182277
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
@@ -133,12 +131,11 @@ private void executeActionAsync(
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||
--- End diff --

Do i understand it correctly that in the case of a wrong KvStateId a 
UnknownKvStateLocationException is thrown? 


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151179694
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -439,6 +443,85 @@ public Integer getKey(Tuple2 value) 
throws Exception {
}
 
/**
+* Tests that the correct exception is thrown if the query
+* contains a wrong queryable state name.
+*/
+   @Test
+   public void testWrongQueryableStateName() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final long numElements = 1024L;
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStateBackend(stateBackend);
+   env.setParallelism(maxParallelism);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+   // submitting.
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+   DataStream> source = env
+   .addSource(new 
TestAscendingValueSource(numElements));
+
+   // Value state
+   ValueStateDescriptor> valueState =
+   new ValueStateDescriptor<>("any", 
source.getType());
+
+   source.keyBy(new KeySelector, 
Integer>() {
+   private static final long serialVersionUID = 
7662520075515707428L;
+
+   @Override
+   public Integer getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   }).asQueryableState("hakuna", valueState);
+
+   // Submit the job graph
+   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+   jobId = jobGraph.getJobID();
+
+   cluster.submitJobDetached(jobGraph);
+
+   // wait until the job is running before starting to 
query.
+   
FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
+   .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
+   
.mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+   CompletableFuture>> 
future = client.getKvState(
+   jobId,
+   "wrong-hankuna", // this is the wrong 
name.
+   0,
+   VoidNamespace.INSTANCE,
+   BasicTypeInfo.INT_TYPE_INFO,
+   VoidNamespaceTypeInfo.INSTANCE,
+   valueState);
+
+   final CompletableFuture completion = new 
CompletableFuture<>();
+   future.whenComplete((result, throwable) -> {
+   Assert.assertTrue(throwable != null);
+   
Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation"));
+   completion.complete(null);
+   });
+
+   completion.join();
--- End diff --

if the test fails this will block indefinitely.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151179388
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 ---
@@ -262,7 +262,7 @@ public void run() {
try {
stats.reportFailedRequest();
 
-   final String errMsg = "Failed 
request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+   final String errMsg = "Failed 
request " + requestId + ".\nCaused by: " + ExceptionUtils.stringifyException(t);
--- End diff --

this should be `System.lineSeparator()`


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151162640
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 ---
@@ -262,7 +262,7 @@ public void run() {
try {
stats.reportFailedRequest();
 
-   final String errMsg = "Failed 
request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+   final String errMsg = "Failed 
request " + requestId + ".\nCaused by: " + ExceptionUtils.stringifyException(t);
--- End diff --

I put it there because it looked actually nicer. But I suppose this is a 
matter of personal taste.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151160873
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 ---
@@ -262,7 +262,7 @@ public void run() {
try {
stats.reportFailedRequest();
 
-   final String errMsg = "Failed 
request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+   final String errMsg = "Failed 
request " + requestId + ".\nCaused by: " + ExceptionUtils.stringifyException(t);
--- End diff --

Why is there a newline? I think this can look strange in a log.


---


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5021

[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation.

## What is the purpose of the change

Fix bug in QS client. Before if the client submitted a query with invalid 
queryable state  name, it would retry the query indefinitely. This change fixes 
it.

## Brief change log

The change is in the `KvStateClientProxyHandler` where we change the 
conditions based on which we automatically retry a query.

## Verifying this change

Adds test `AbstractQueryableStateTestBase.testWrongQueryableStateName()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-8063-client-blocking

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5021.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5021


commit d4f4a17bf6af98fdf4e2f362e9b20d1998e74110
Author: kkloudas 
Date:   2017-11-14T14:05:45Z

[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is 
thrown.




---