[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5021 ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151188168 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java --- @@ -133,12 +131,11 @@ private void executeActionAsync( operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || --- End diff -- The problem was the, now removed, `UnknownKvStateLocation`. This is the one returned by the JobManager. Then, based on that, you go to the appropriate TaskManager. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151187261 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java --- @@ -133,12 +131,11 @@ private void executeActionAsync( operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || --- End diff -- Then i don't understand how this can fix the issue. In case of an `UnknownKvStateIdException` the client still retries fetching the state. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151184658 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -1372,84 +1492,60 @@ public String fold(String accumulator, Tuple2value) throws Excep / General Utility Methods // - private static CompletableFuture getKvStateWithRetries( + private static CompletableFuture getKvState( final QueryableStateClient client, final JobID jobId, final String queryName, final K key, final TypeInformation keyTypeInfo, final StateDescriptor stateDescriptor, - final Time retryDelay, final boolean failForUnknownKeyOrNamespace, - final ScheduledExecutor executor) { - return retryWithDelay( - () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor), - NO_OF_RETRIES, - retryDelay, - executor, - failForUnknownKeyOrNamespace); - } - - private static CompletableFuture retryWithDelay( - final Supplieroperation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { - - final CompletableFuture resultFuture = new CompletableFuture<>(); - - retryWithDelay( - resultFuture, - operation, - retries, - retryDelay, - scheduledExecutor, - failIfUnknownKeyOrNamespace); + final ScheduledExecutor executor) throws InterruptedException { + final CompletableFuture resultFuture = new CompletableFuture<>(); + getKvStateIgnoringCertainExceptions( + resultFuture, client, jobId, queryName, key, keyTypeInfo, + stateDescriptor, failForUnknownKeyOrNamespace, executor); return resultFuture; } - public static void retryWithDelay( - final CompletableFuture resultFuture, - final Supplier operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { + private static void getKvStateIgnoringCertainExceptions( + final CompletableFuture resultFuture, + final QueryableStateClient client, + final JobID jobId, + final String queryName, + final K key, + final TypeInformation keyTypeInfo, + final StateDescriptor stateDescriptor, + final boolean failForUnknownKeyOrNamespace, + final ScheduledExecutor executor) throws InterruptedException { if (!resultFuture.isDone()) { - final CompletableFuture operationResultFuture = operation.get(); - operationResultFuture.whenCompleteAsync( - (t, throwable) -> { - if (throwable != null) { - if (throwable.getCause() instanceof CancellationException) { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause())); - } else if (throwable.getCause() instanceof AssertionError || - (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) { - resultFuture.completeExceptionally(throwable.getCause()); - } else { -
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151184079 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -917,20 +1023,24 @@ public void processElement(Tuple2value, Context ctx, Collector
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151184645 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2value) throws Exception { } /** +* Tests that the correct exception is thrown if the query +* contains a wrong queryable state name. +*/ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream > source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor > valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture >> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { --- End diff -- Yes, you are right. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151184384 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java --- @@ -133,12 +131,11 @@ private void executeActionAsync( operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || --- End diff -- The `UnknownKvStateLocationException` is thrown in the case we do not have a server for a particular `keyGroup`. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183892 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2value) throws Exception { } /** +* Tests that the correct exception is thrown if the query +* contains a wrong queryable state name. +*/ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream > source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor > valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture >> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { + Assert.assertTrue(throwable != null); + Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation")); + completion.complete(null); + }); + + completion.join(); --- End diff -- I also remember till suggesting to use `get()` instead ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183957 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2value) throws Exception { } /** +* Tests that the correct exception is thrown if the query +* contains a wrong queryable state name. +*/ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream > source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor > valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture >> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { + Assert.assertTrue(throwable != null); + Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation")); --- End diff -- No because it is already "string-ified". ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183730 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2value) throws Exception { } /** +* Tests that the correct exception is thrown if the query +* contains a wrong queryable state name. +*/ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream > source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor > valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture >> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { --- End diff -- I don't think we need the `completion` future here. we could return null in the future in case of success, throw an exception in case of failure, and call get on the result. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183695 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java --- @@ -133,12 +131,11 @@ private void executeActionAsync( operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || --- End diff -- No, an `UnknownKvStateIdException` is thrown in this case. See `KvStateServerHandler`. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183131 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2value) throws Exception { } /** +* Tests that the correct exception is thrown if the query +* contains a wrong queryable state name. +*/ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream > source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor > valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture >> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { + Assert.assertTrue(throwable != null); + Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation")); --- End diff -- can you not check the class of the exception? ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151182879 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java --- @@ -133,12 +131,11 @@ private void executeActionAsync( operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || --- End diff -- Then, in which cases is an UnknownKvStateIdException thrown? That actually sounds more appropriate. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151182277 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java --- @@ -133,12 +131,11 @@ private void executeActionAsync( operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || --- End diff -- Do i understand it correctly that in the case of a wrong KvStateId a UnknownKvStateLocationException is thrown? ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151179694 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2value) throws Exception { } /** +* Tests that the correct exception is thrown if the query +* contains a wrong queryable state name. +*/ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream > source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor > valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture >> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { + Assert.assertTrue(throwable != null); + Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation")); + completion.complete(null); + }); + + completion.join(); --- End diff -- if the test fails this will block indefinitely. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151179388 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java --- @@ -262,7 +262,7 @@ public void run() { try { stats.reportFailedRequest(); - final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t); + final String errMsg = "Failed request " + requestId + ".\nCaused by: " + ExceptionUtils.stringifyException(t); --- End diff -- this should be `System.lineSeparator()` ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151162640 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java --- @@ -262,7 +262,7 @@ public void run() { try { stats.reportFailedRequest(); - final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t); + final String errMsg = "Failed request " + requestId + ".\nCaused by: " + ExceptionUtils.stringifyException(t); --- End diff -- I put it there because it looked actually nicer. But I suppose this is a matter of personal taste. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151160873 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java --- @@ -262,7 +262,7 @@ public void run() { try { stats.reportFailedRequest(); - final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t); + final String errMsg = "Failed request " + requestId + ".\nCaused by: " + ExceptionUtils.stringifyException(t); --- End diff -- Why is there a newline? I think this can look strange in a log. ---
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5021 [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation. ## What is the purpose of the change Fix bug in QS client. Before if the client submitted a query with invalid queryable state name, it would retry the query indefinitely. This change fixes it. ## Brief change log The change is in the `KvStateClientProxyHandler` where we change the conditions based on which we automatically retry a query. ## Verifying this change Adds test `AbstractQueryableStateTestBase.testWrongQueryableStateName()`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink qs-8063-client-blocking Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5021.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5021 commit d4f4a17bf6af98fdf4e2f362e9b20d1998e74110 Author: kkloudasDate: 2017-11-14T14:05:45Z [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown. ---