[jira] [Updated] (FLINK-32626) Distinguish non-existent job from non-existent savepoint in Get Savepoint REST API

2023-07-18 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-32626:
--
Description: 
The current `GET /jobs/:jobid/savepoints/:triggerid` API endpoint [1], when 
given *either* a Job ID or a Trigger ID that does not exist it will always 
respond with an exception that indicates {*}the Savepoint doesn't exist{*}, 
like:
{code:java}
{"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: There 
is no savepoint operation with triggerId=TRIGGER ID for job JOB ID.\n\tat 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.maybeCreateNotFoundError(SavepointHandlers.java:325)\n\tat
 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$handleRequest$1(SavepointHandlers.java:308)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)\n\tat
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)\n\tat
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)\n\tat
 akka.dispatch.OnComplete.internal(Future.scala:299)\n\tat 
akka.dispatch.OnComplete.internal(Future.scala:297)\n\tat 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)\n\tat 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)\n\tat 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)\n\tat
 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)\n\tat 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)\n\tat
 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)\n\tat
 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)\n\tat
 akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)\n\tat 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)\n\tat
 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)\n\tat
 scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)\n\tat 
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\n\tat 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\n\tat 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)\n\tat
 
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)\n\tat
 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)\n\tat 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)\n\tat
 akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)\n\tat 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)\n\tat
 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)\n\tat 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)\n\tat
 

[jira] [Updated] (FLINK-32626) Distinguish non-existent job from non-existent savepoint in Get Savepoint REST API

2023-07-18 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-32626:
--
Summary: Distinguish non-existent job from non-existent savepoint in Get 
Savepoint REST API  (was: Get Savepoint REST API doesn't distinguish 
non-existent job from non-existent savepoint )

> Distinguish non-existent job from non-existent savepoint in Get Savepoint 
> REST API
> --
>
> Key: FLINK-32626
> URL: https://issues.apache.org/jira/browse/FLINK-32626
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.17.1
>Reporter: Austin Cawley-Edwards
>Priority: Not a Priority
>
> The current `GET /jobs/:jobid/savepoints/:triggerid` API endpoint [1], when 
> given *either* a Job ID or a Trigger ID that does not exist, it will respond 
> with an exception that indicates the Savepoint doesn't exist, like:
> {code:java}
> {"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: There 
> is no savepoint operation with triggerId=TRIGGER ID for job JOB ID.\n\tat 
> org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.maybeCreateNotFoundError(SavepointHandlers.java:325)\n\tat
>  
> org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$handleRequest$1(SavepointHandlers.java:308)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
>  
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
>  
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)\n\tat
>  
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)\n\tat
>  
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat
>  
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
>  
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
>  
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)\n\tat
>  akka.dispatch.OnComplete.internal(Future.scala:299)\n\tat 
> akka.dispatch.OnComplete.internal(Future.scala:297)\n\tat 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)\n\tat 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)\n\tat 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)\n\tat
>  
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)\n\tat
>  
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)\n\tat
>  
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)\n\tat
>  
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)\n\tat
>  akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)\n\tat 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)\n\tat
>  
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)\n\tat
>  scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)\n\tat 
> scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\n\tat 
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\n\tat 
> 

[jira] [Updated] (FLINK-32626) Distinguish non-existent job from non-existent savepoint in Get Savepoint REST API

2023-07-18 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-32626:
--
Description: 
The current `GET /jobs/:jobid/savepoints/:triggerid` API endpoint [1], when 
given *either* a Job ID or a Trigger ID that does not exist, it will respond 
with an exception that indicates the Savepoint doesn't exist, like:
{code:java}
{"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: There 
is no savepoint operation with triggerId=TRIGGER ID for job JOB ID.\n\tat 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.maybeCreateNotFoundError(SavepointHandlers.java:325)\n\tat
 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$handleRequest$1(SavepointHandlers.java:308)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)\n\tat
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)\n\tat
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)\n\tat
 akka.dispatch.OnComplete.internal(Future.scala:299)\n\tat 
akka.dispatch.OnComplete.internal(Future.scala:297)\n\tat 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)\n\tat 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)\n\tat 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)\n\tat
 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)\n\tat 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)\n\tat
 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)\n\tat
 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)\n\tat
 akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)\n\tat 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)\n\tat
 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)\n\tat
 scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)\n\tat 
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\n\tat 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\n\tat 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)\n\tat
 
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)\n\tat
 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)\n\tat 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)\n\tat
 akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)\n\tat 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)\n\tat
 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)\n\tat 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)\n\tat
 

[jira] [Created] (FLINK-32626) Get Savepoint REST API doesn't distinguish non-existent job from non-existent savepoint

2023-07-18 Thread Austin Cawley-Edwards (Jira)
Austin Cawley-Edwards created FLINK-32626:
-

 Summary: Get Savepoint REST API doesn't distinguish non-existent 
job from non-existent savepoint 
 Key: FLINK-32626
 URL: https://issues.apache.org/jira/browse/FLINK-32626
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Affects Versions: 1.17.1
Reporter: Austin Cawley-Edwards


The current `GET /jobs/:jobid/savepoints/:triggerid` API endpoint [1], when 
given *either* a Job ID or a Trigger ID that does not exist, it will respond 
with an exception that indicates the Savepoint doesn't exist, like:
{code:java}
{"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: There 
is no savepoint operation with triggerId=TRIGGER ID for job JOB ID.\n\tat 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.maybeCreateNotFoundError(SavepointHandlers.java:325)\n\tat
 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$handleRequest$1(SavepointHandlers.java:308)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)\n\tat
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)\n\tat
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat
 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)\n\tat
 akka.dispatch.OnComplete.internal(Future.scala:299)\n\tat 
akka.dispatch.OnComplete.internal(Future.scala:297)\n\tat 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)\n\tat 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)\n\tat 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)\n\tat
 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)\n\tat 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)\n\tat
 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)\n\tat
 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)\n\tat
 akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)\n\tat 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)\n\tat
 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)\n\tat
 scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)\n\tat 
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\n\tat 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\n\tat 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)\n\tat
 
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)\n\tat
 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)\n\tat 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)\n\tat
 akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)\n\tat 

[jira] [Updated] (FLINK-10212) REST API for listing all the available save points

2023-07-18 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-10212:
--
Description: 
*Background*

I'm one of the authors of the open-source Flink job deployer 
(https://github.com/ing-bank/flink-deployer). Recently, I rewrote our 
implementation to use the Flink REST API instead of the native CLI. 

In our use case, we store the job savepoints in a Kubernetes persistent volume. 
For our deployer, we mount the persistent volume to our deployer container so 
that we can find and use the savepoints. 

In the rewrite to the REST API, I saw that the API to monitor savepoint 
creation returns the complete path to the created savepoint, and we can use 
this one in the job deployer to start the new job with the latest save point.

However, we also allow users to deploy a job with a recovered state by 
specifying only the directory savepoints are stored in. In this scenario we 
will look for the latest savepoint created for this job ourselves inside the 
given directory. To find this path, we're still relying on the mounted volume 
and listing directory content to discover savepoints.

*Feature*

I was thinking that it might be a good addition if the native Flink REST API 
offers the ability to retrieve savepoints. Seeing that the API doesn't 
inherently know where savepoints are stored, it could take a path as one of the 
arguments. It could even allow the user to provide a job ID as an argument so 
that the API would be able to search for savepoints for a specific job ID in 
the specified directory. 

As the API would require the path as an argument, and providing a path 
containing forward slashes in the URL isn't ideal, I'm eager to discuss what a 
proper solution would look like.

A POST request to /jobs/:jobid/savepoints with the path as a body parameter 
would make sense if the API were to offer to list all save points in a specific 
path but this request is already being used for creating new savepoints.

An alternative could be a POST to /savepoints with the path and job ID in the 
request body.

A POST request to retrieve data is obviously not the most straightforward 
approach but in my opinion still preferable over a GET to, for example, 
/jobs/:jobid/savepoints/:targetDirectory

I'm willing to help out on this one by submitting a pull request.

Looking forward to your thoughts! 

  was:
*Background*

I'm one of the authors of the open-source Flink job deployer 
([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our 
implementation to use the Flink REST API instead of the native CLI. 

In our use case, we store the job savepoints in a Kubernetes persistent volume. 
For our deployer, we mount the persistent volume to our deployer container so 
that we can find and use the savepoints. 

In the rewrite to the REST API, I saw that the API to monitor savepoint 
creation returns the complete path to the created savepoint, and we can use 
this one in the job deployer to start the new job with the latest save point.

However, we also allow users to deploy a job with a recovered state by 
specifying only the directory savepoints are stored in. In this scenario we 
will look for the latest savepoint created for this job ourselves inside the 
given directory. To find this path, we're still relying on the mounted volume 
and listing directory content to discover savepoints.

*Feature*

I was thinking that it might be a good addition if the native Flink REST API 
offers the ability to retrieve savepoints. Seeing that the API doesn't 
inherently know where savepoints are stored, it could take a path as one of the 
arguments. It could even allow the user to provide a job ID as an argument so 
that the API would be able to search for savepoints for a specific job ID in 
the specified directory. 

As the API would require the path as an argument, and providing a path 
containing forward slashes in the URL isn't ideal, I'm eager to discuss what a 
proper solution would look like.

A POST request to /jobs/:jobid/savepoints with the path as a body parameter 
would make sense if the API were to offer to list all save points in a specific 
path but this request is already being used for creating new savepoints.

An alternative could be a POST to /savepoints with the path and job ID in the 
request body.

A POST request to retrieve data is obviously not the most straightforward 
approach but in my opinion still preferable over a GET to, for example, 
/jobs/:jobid/savepoints/:targetDirectory

I'm willing to help out on this one by submitting a pull request.

Looking forward to your thoughts! 


> REST API for listing all the available save points
> --
>
> Key: FLINK-10212
> URL: https://issues.apache.org/jira/browse/FLINK-10212
> Project: Flink
>  Issue Type: New 

[jira] [Updated] (FLINK-30445) Add Flink Web3 Connector

2022-12-16 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-30445:
--
Description: 
Web3 is very hot. But you could search GitHub open source blockchain explorer, 
the most stars project is blockscout, 
[https://github.com/blockscout/blockscout|https://github.com/blockscout/blockscout,]
 which use Elixir as a parallel engine to sync block from blockchain node into 
a file(CSV format). I think Flink is the best solution of ingestion. Reason:

(1)blockchain needs to match different chain, including Ethereum, Bitcoin, 
Solana, etc. through JSON RPC.

(2)Like EtherScan, the blockchain needs to fetch the latest block into storage 
for the index to search.

(3)Also as a supplement to (2), we need a connector to fully sync all block 
from Blockchain Node. I think Flink Stream/Batch alignment feature is suit for 
this scenarios.

(4)According to FLIP-27, we could use block number as SourceSplit to read. It 
is very natural.

(5)Flink Community could use web3 topic to get PR effects on web3 cycle.

  was:
Web3 is very hot. But you could search GitHub open source blockchain explorer, 
the most stars project is blockscout, 
[https://github.com/blockscout/blockscout,] which use Elixir as a parallel 
engine to sync block from blockchain node into a file(CSV format). I think 
Flink is the best solution of ingestion. Reason:

(1)blockchain needs to match different chain, including Ethereum, Bitcoin, 
Solana, etc. through JSON RPC.

(2)Like EtherScan, the blockchain needs to fetch the latest block into storage 
for the index to search.

(3)Also as a supplement to (2), we need a connector to fully sync all block 
from Blockchain Node. I think Flink Stream/Batch alignment feature is suit for 
this scenarios.

(4)According to FLIP-27, we could use block number as SourceSplit to read. It 
is very natural.

(5)Flink Community could use web3 topic to get PR effects on web3 cycle.


> Add Flink Web3 Connector
> 
>
> Key: FLINK-30445
> URL: https://issues.apache.org/jira/browse/FLINK-30445
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / ORC
>Reporter: Junyao Huang
>Priority: Major
>
> Web3 is very hot. But you could search GitHub open source blockchain 
> explorer, the most stars project is blockscout, 
> [https://github.com/blockscout/blockscout|https://github.com/blockscout/blockscout,]
>  which use Elixir as a parallel engine to sync block from blockchain node 
> into a file(CSV format). I think Flink is the best solution of ingestion. 
> Reason:
> (1)blockchain needs to match different chain, including Ethereum, Bitcoin, 
> Solana, etc. through JSON RPC.
> (2)Like EtherScan, the blockchain needs to fetch the latest block into 
> storage for the index to search.
> (3)Also as a supplement to (2), we need a connector to fully sync all block 
> from Blockchain Node. I think Flink Stream/Batch alignment feature is suit 
> for this scenarios.
> (4)According to FLIP-27, we could use block number as SourceSplit to read. It 
> is very natural.
> (5)Flink Community could use web3 topic to get PR effects on web3 cycle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29863) Properly handle NaN/Infinity in OpenAPI spec

2022-12-13 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646786#comment-17646786
 ] 

Austin Cawley-Edwards commented on FLINK-29863:
---

Because we're using Jackson to serialize and they support these number formats? 
If that's the case ( :( ), they seem to have been talking about this for years 
with no real progress: 
[https://github.com/FasterXML/jackson-databind/issues/911]

There's a couple of ideas in that issue for ways to correctly serialize NaN and 
infinities as `null`, but I can see that it would be hard to write test cases 
to enforce this throughout Flink's Rest API.

One thought might be to do not try to strictly enforce it, but treat all found 
occurrences as bugs — it would be nice if the JSON could be valid, as working 
with multi-type returns even if documented in OpenAPI is a bit painful. Wdyt?

> Properly handle NaN/Infinity in OpenAPI spec
> 
>
> Key: FLINK-29863
> URL: https://issues.apache.org/jira/browse/FLINK-29863
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / REST
>Affects Versions: 1.15.0
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.17.0
>
>
> Our OpenAPI spec maps all float/double fields to float64, but we at times 
> also return NaN/infinity which can't be represented as such since the JSON 
> spec doesn't support it.
> One alternative could be to document it as an either type, returning either a 
> float64 or a string.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29863) Properly handle NaN/Infinity in OpenAPI spec

2022-12-13 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646692#comment-17646692
 ] 

Austin Cawley-Edwards commented on FLINK-29863:
---

What might be the issue of fixing the API to not return NaN/infinity instead?

 

> Numeric values that cannot be represented as sequences of digits (such as 
> Infinity and NaN) are not permitted.

 

https://www.ecma-international.org/publications-and-standards/standards/ecma-404/

> Properly handle NaN/Infinity in OpenAPI spec
> 
>
> Key: FLINK-29863
> URL: https://issues.apache.org/jira/browse/FLINK-29863
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Runtime / REST
>Affects Versions: 1.15.0
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.17.0
>
>
> Our OpenAPI spec maps all float/double fields to float64, but we at times 
> also return NaN/infinity which can't be represented as such since the JSON 
> spec doesn't support it.
> One alternative could be to document it as an either type, returning either a 
> float64 or a string.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26335) prometheus/metrics - java.lang.OutOfMemoryError: Java heap space

2022-09-27 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17610148#comment-17610148
 ] 

Austin Cawley-Edwards commented on FLINK-26335:
---

 Hey [~mlnogueira], thanks for the report. Do you know roughly how many metrics 
your job was trying to export (# of labels, # of custom metrics, total size 
(hundreds of mbs? gbs?)) when this happened? Were you able to resolve it by 
giving the job more resources?

> prometheus/metrics - java.lang.OutOfMemoryError: Java heap space
> 
>
> Key: FLINK-26335
> URL: https://issues.apache.org/jira/browse/FLINK-26335
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.14.3
>Reporter: Marcelo Nogueira
>Priority: Major
>
>  
> {code:java}
> 2022-02-22 14:39:33.782 INFO 
> [org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler]
>  - Committing the state for checkpoint 4
> 2022-02-22 14:39:47.367 ERROR 
> [org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler] - WARNING: 
> Thread 'prometheus-http-1-6' produced an uncaught exception. If you want to 
> fail on uncaught exceptions, then configure 
> cluster.uncaught-exception-handling accordingly
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Unknown Source) ~[?:?]
>     at java.io.ByteArrayOutputStream.grow(Unknown Source) ~[?:?]
>     at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source) ~[?:?]
>     at java.io.ByteArrayOutputStream.write(Unknown Source) ~[?:?]
>     at sun.nio.cs.StreamEncoder.writeBytes(Unknown Source) ~[?:?]
>     at sun.nio.cs.StreamEncoder.implWrite(Unknown Source) ~[?:?]
>     at sun.nio.cs.StreamEncoder.implWrite(Unknown Source) ~[?:?]
>     at sun.nio.cs.StreamEncoder.write(Unknown Source) ~[?:?]
>     at sun.nio.cs.StreamEncoder.write(Unknown Source) ~[?:?]
>     at java.io.OutputStreamWriter.write(Unknown Source) ~[?:?]
>     at java.io.Writer.append(Unknown Source) ~[?:?]
>     at 
> io.prometheus.client.exporter.common.TextFormat.writeEscapedLabelValue(TextFormat.java:88)
>  ~[flink-metrics-prometheus-1.14.3.jar:1.14.3]
>     at 
> io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:42) 
> ~[flink-metrics-prometheus-1.14.3.jar:1.14.3]
>     at 
> io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60)
>  ~[flink-metrics-prometheus-1.14.3.jar:1.14.3]
>     at com.sun.net.httpserver.Filter$Chain.doFilter(Unknown Source) 
> ~[jdk.httpserver:?]
>     at sun.net.httpserver.AuthFilter.doFilter(Unknown Source) 
> ~[jdk.httpserver:?]
>     at com.sun.net.httpserver.Filter$Chain.doFilter(Unknown Source) 
> ~[jdk.httpserver:?]
>     at sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(Unknown 
> Source) ~[jdk.httpserver:?]
>     at com.sun.net.httpserver.Filter$Chain.doFilter(Unknown Source) 
> ~[jdk.httpserver:?]
>     at sun.net.httpserver.ServerImpl$Exchange.run(Unknown Source) 
> ~[jdk.httpserver:?]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> ~[?:?]
>     at java.lang.Thread.run(Unknown Source) [?:?]
> 2022-02-22 14:40:32.668 INFO 
> [org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer] - Flushing 
> new partitions// code placeholder
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27356) Incorrect Number of Methods Listed for SplitReader

2022-04-22 Thread Austin Cawley-Edwards (Jira)
Austin Cawley-Edwards created FLINK-27356:
-

 Summary: Incorrect Number of Methods Listed for SplitReader
 Key: FLINK-27356
 URL: https://issues.apache.org/jira/browse/FLINK-27356
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.14.0, 1.15.0
Reporter: Austin Cawley-Edwards


The docs state that 
[`SplitReader`|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java]
 only has 3 methods, but it has four. The `close()` method is missing from the 
docs.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-24208) Allow idempotent savepoint triggering

2021-09-21 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418252#comment-17418252
 ] 

Austin Cawley-Edwards commented on FLINK-24208:
---

Great, we're on the same page. Thanks!

> Allow idempotent savepoint triggering
> -
>
> Key: FLINK-24208
> URL: https://issues.apache.org/jira/browse/FLINK-24208
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / REST
>Reporter: Robert Metzger
>Priority: Major
>
> As a user of Flink, I want to be able to trigger a savepoint from an external 
> system in a way that I can detect if I have requested this savepoint already.
> By passing a custom ID to the savepoint request, I can check (in case of an 
> error of the original request, or the external system) if the request has 
> been made already.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-24208) Allow idempotent savepoint triggering

2021-09-20 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417718#comment-17417718
 ] 

Austin Cawley-Edwards edited comment on FLINK-24208 at 9/20/21, 5:48 PM:
-

[~chesnay], definitely on the same page that the user provides a specific ID.

Maybe I did not understand the sketch, but from the current description I would 
assume sending the same `triggerId` if one already exists would result in an 
error (like 409 Conflict), so it can be detected as a duplicate on the user 
side.

I think what we rather want to say (and what I thought the sketch implements) 
is that duplicate requests result in the same success response (i.e., 200 w/ a 
`TriggerResponse`). It does not matter to the user that another request with 
the same `triggerId` has already been sent.

 

I would suggest updating the description to:

 
 As a user of Flink, I want to be able to trigger a savepoint from an external 
system in a way that is safe to retry so I can guard against ephemeral errors, 
like network failures, and simplify my upgrade routines.

By passing a custom ID to the savepoint request, I can let Flink deduplicate 
the requests and not worry about storing the result or about sending the 
request again.


was (Author: austince):
[~chesnay], definitely on the same page that the user provides a specific ID.

Maybe I did not understand the sketch, but from the current description I would 
assume sending the same `triggerId` if one is already exists would result in an 
error (like 409 Conflict), so it can be detected as a duplicate on the user 
side.

I think what we rather want to say (and what I thought the sketch implements) 
is that duplicate requests result in the same success response (i.e., 200 w/ a 
`TriggerResponse`). It does not matter to the user that another request with 
the same `triggerId` has already been sent.

 

I would suggest updating the description to:

 
 As a user of Flink, I want to be able to trigger a savepoint from an external 
system in a way that is safe to retry so I can guard against ephemeral errors, 
like network failures, and simplify my upgrade routines.

By passing a custom ID to the savepoint request, I can let Flink deduplicate 
the requests and not worry about storing the result or sending the request 
again.

> Allow idempotent savepoint triggering
> -
>
> Key: FLINK-24208
> URL: https://issues.apache.org/jira/browse/FLINK-24208
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / REST
>Reporter: Robert Metzger
>Priority: Major
>
> As a user of Flink, I want to be able to trigger a savepoint from an external 
> system in a way that I can detect if I have requested this savepoint already.
> By passing a custom ID to the savepoint request, I can check (in case of an 
> error of the original request, or the external system) if the request has 
> been made already.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24275) Allow idempotent job cancellation

2021-09-20 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417720#comment-17417720
 ] 

Austin Cawley-Edwards commented on FLINK-24275:
---

I see two options:
 * Treat the endpoint imperatively ("Switch this Job to Cancelled")
 ** Fail on already cancelled (i.e. 409)
 * Treat the endpoint declaratively ("Ensure this Job is Cancelled")
 ** Return a success message (i.e., 200, or 202 like is currently returned)

 

You know which fits better with the Flink API concepts than I do, but I would 
lean towards the second option to simplify how users handle it. If duplicate 
calls return an error, users have to classify this error as acceptable. I don't 
see any case where a 409 error is handled any differently than a success.

 

 

 

> Allow idempotent job cancellation
> -
>
> Key: FLINK-24275
> URL: https://issues.apache.org/jira/browse/FLINK-24275
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Austin Cawley-Edwards
>Priority: Major
>
> As a user of Flink, I want to be able to cancel a job from an external system 
> in a fault-tolerant way without guessing if the job has already been 
> cancelled.
>  
> Currently, the cancel endpoint (PATCH /jobs/:jobid?mode=cancel) will return a 
> 404 if the job is already cancelled. This makes it hard to detect if the job 
> truly doesn't exist, or if it is already in the desired state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-24208) Allow idempotent savepoint triggering

2021-09-20 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417718#comment-17417718
 ] 

Austin Cawley-Edwards edited comment on FLINK-24208 at 9/20/21, 4:24 PM:
-

[~chesnay], definitely on the same page that the user provides a specific ID.

Maybe I did not understand the sketch, but from the current description I would 
assume sending the same `triggerId` if one is already exists would result in an 
error (like 409 Conflict), so it can be detected as a duplicate on the user 
side.

I think what we rather want to say (and what I thought the sketch implements) 
is that duplicate requests result in the same success response (i.e., 200 w/ a 
`TriggerResponse`). It does not matter to the user that another request with 
the same `triggerId` has already been sent.

 

I would suggest updating the description to:

 
 As a user of Flink, I want to be able to trigger a savepoint from an external 
system in a way that is safe to retry so I can guard against ephemeral errors, 
like network failures, and simplify my upgrade routines.

By passing a custom ID to the savepoint request, I can let Flink deduplicate 
the requests and not worry about storing the result or sending the request 
again.


was (Author: austince):
[~chesnay], definitely on the same page that the user provides a specific ID.

Maybe I did not understand the sketch, but from the current description I would 
assume sending the same `triggerId` if one is already in progress would result 
in an error (like 409 Conflict) so it can be detected as a duplicate.

I think what we rather want to say (and what I thought the sketch implements) 
is that duplicate requests result in the same success response (i.e., 200 w/ a 
`TriggerResponse`). It does not matter to the user that another request with 
the same `triggerId` has already been sent. 

 

I would suggest updating the description to:

 
As a user of Flink, I want to be able to trigger a savepoint from an external 
system in a way that is safe to retry so I can guard against ephemeral errors, 
like network failures, and simplify my upgrade routines.

By passing a custom ID to the savepoint request, I can let Flink deduplicate 
the requests and not worry about storing the result or sending the request 
again.

> Allow idempotent savepoint triggering
> -
>
> Key: FLINK-24208
> URL: https://issues.apache.org/jira/browse/FLINK-24208
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / REST
>Reporter: Robert Metzger
>Priority: Major
>
> As a user of Flink, I want to be able to trigger a savepoint from an external 
> system in a way that I can detect if I have requested this savepoint already.
> By passing a custom ID to the savepoint request, I can check (in case of an 
> error of the original request, or the external system) if the request has 
> been made already.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24208) Allow idempotent savepoint triggering

2021-09-20 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417718#comment-17417718
 ] 

Austin Cawley-Edwards commented on FLINK-24208:
---

[~chesnay], definitely on the same page that the user provides a specific ID.

Maybe I did not understand the sketch, but from the current description I would 
assume sending the same `triggerId` if one is already in progress would result 
in an error (like 409 Conflict) so it can be detected as a duplicate.

I think what we rather want to say (and what I thought the sketch implements) 
is that duplicate requests result in the same success response (i.e., 200 w/ a 
`TriggerResponse`). It does not matter to the user that another request with 
the same `triggerId` has already been sent. 

 

I would suggest updating the description to:

 
As a user of Flink, I want to be able to trigger a savepoint from an external 
system in a way that is safe to retry so I can guard against ephemeral errors, 
like network failures, and simplify my upgrade routines.

By passing a custom ID to the savepoint request, I can let Flink deduplicate 
the requests and not worry about storing the result or sending the request 
again.

> Allow idempotent savepoint triggering
> -
>
> Key: FLINK-24208
> URL: https://issues.apache.org/jira/browse/FLINK-24208
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / REST
>Reporter: Robert Metzger
>Priority: Major
>
> As a user of Flink, I want to be able to trigger a savepoint from an external 
> system in a way that I can detect if I have requested this savepoint already.
> By passing a custom ID to the savepoint request, I can check (in case of an 
> error of the original request, or the external system) if the request has 
> been made already.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24275) Allow idempotent job cancellation

2021-09-13 Thread Austin Cawley-Edwards (Jira)
Austin Cawley-Edwards created FLINK-24275:
-

 Summary: Allow idempotent job cancellation
 Key: FLINK-24275
 URL: https://issues.apache.org/jira/browse/FLINK-24275
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Reporter: Austin Cawley-Edwards


As a user of Flink, I want to be able to cancel a job from an external system 
in a fault-tolerant way without guessing if the job has already been cancelled.

 

Currently, the cancel endpoint (PATCH /jobs/:jobid?mode=cancel) will return a 
404 if the job is already cancelled. This makes it hard to detect if the job 
truly doesn't exist, or if it is already in the desired state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24208) Allow idempotent savepoint triggering

2021-09-13 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17414559#comment-17414559
 ] 

Austin Cawley-Edwards commented on FLINK-24208:
---

The sketch looks great [~chesnay] ! We may want to clarify the description to 
say that this is really about safely requesting a savepoint from an external 
system – the user should not care (or know) if a savepoint with the trigger ID 
has already been requested (which seems to be how you've implemented it :)).

> Allow idempotent savepoint triggering
> -
>
> Key: FLINK-24208
> URL: https://issues.apache.org/jira/browse/FLINK-24208
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / REST
>Reporter: Robert Metzger
>Priority: Major
>
> As a user of Flink, I want to be able to trigger a savepoint from an external 
> system in a way that I can detect if I have requested this savepoint already.
> By passing a custom ID to the savepoint request, I can check (in case of an 
> error of the original request, or the external system) if the request has 
> been made already.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24208) Allow idempotent savepoint triggering

2021-09-08 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-24208:
--
Component/s: Runtime / REST

> Allow idempotent savepoint triggering
> -
>
> Key: FLINK-24208
> URL: https://issues.apache.org/jira/browse/FLINK-24208
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / REST
>Reporter: Robert Metzger
>Priority: Major
>
> As a user of Flink, I want to be able to trigger a savepoint from an external 
> system in a way that I can detect if I have requested this savepoint already.
> By passing a custom ID to the savepoint request, I can check (in case of an 
> error of the original request, or the external system) if the request has 
> been made already.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20244) RMQSource does not ACK duplicated messages

2021-06-22 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367701#comment-17367701
 ] 

Austin Cawley-Edwards commented on FLINK-20244:
---

Thanks for the ping [~cmick] + follow-up on this ticket. The issues you've 
decomposed make sense to me – as for splitting them up, I think it depends on 
how big the changes are if they can fit into a single PR. Multiple tickets + 
commits seem reasonable though. I'll pass this off to [~fabian.paul] and 
[~arvid] though, as they're working more in this area and will know the 
procedures better than I. Anyway, thanks for your continued contributions :)

> RMQSource does not ACK duplicated messages
> --
>
> Key: FLINK-20244
> URL: https://issues.apache.org/jira/browse/FLINK-20244
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.11.2, 1.12.0
>Reporter: Thomas Eckestad
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> *Background*
> The following has been observed when using RMQSource with 
> exactly-once-guarantees.
> When a job is restarted from a checkpoint, and there were unacked messages 
> referenced by that checkpoint at time of the restart, those messages will be 
> requeued by RMQ and resent to the restarted job. But they will not be acked. 
> Later when the connection to RMQ is closed (the job either finishes or is 
> restarted) they will be requeued again.
> When looking at the source code, messages are ACK:ed by the RMQSource after a 
> checkpoint is complete 
> (MessageAcknowledgingSourceBase::notifyCheckpointComplete).
> Also, when looking at the source code in RMQSource::setMessageIdentifier() 
> (on the master branch, the ACK semantics does not seem to have changed since 
> 1.11.2) it is clear that if a RMQ message carries a correlation ID which has 
> already been handled, that message is skipped and not further processed. It 
> is also clear that skipped messages are not added to the sessionIds-list of 
> messages that are targeted for ACK to RMQ.
> I believe all successfully consumed RMQ messages should be ACK:ed, it is 
> irrelevant if the message is ignored or processed by Flink. RMQ needs to know 
> that the consumer considers the message as handled OK.
> The following code is from RMQSource::setMessageIdentifier(). Note the return 
> before sessionIds.add():
>  .
>  .
>  .
>  if (!addId(correlationId))
> { // we have already processed this message return false; }
> }
>  sessionIds.add(deliveryTag);
>  .
>  .
>  .
> Directly related to the above I also noticed that RMQ connections are leaked 
> at internal job restart. From the Flink log (this stack trace is from 1.11.2):
> 2020-11-18 10:08:25,118 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during 
> disposal of stream operator.
>  com.rabbitmq.client.AlreadyClosedException: connection is already closed due 
> to connection error; protocol method: 
> #method(reply-code=320, reply-text=CONNECTION_FORCED - 
> Closed via management plugin, class-id=0, method-id=0)
>  at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> AlreadyClosedException is not handled by the RMQSource::close(). This results 
> in a RMQ connection thread somewhere being left behind. I triggered three 
> restarts of the job in a row and noticed one new connection added to the pile 
> of connections for each restart. I triggered the restart by killing the 
> active connection to RMQ using the RMQ admin GUI (management plugin, see 
> above exception details).
> I also tried to kill one of the leaked connections. But a new one is 
> instantly created when doing so. The traceback when doing this (1.11.2):
> 2020-11-18 10:27:51,715 ERROR 
> com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An unexpected 
> connection driver error occured
>  java.lang.NoClassDefFoundError: 
> com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
>  at 
> 

[jira] [Comment Edited] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue

2021-05-26 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351899#comment-17351899
 ] 

Austin Cawley-Edwards edited comment on FLINK-22698 at 5/26/21, 4:23 PM:
-

[~nicholasjiang] – yes, I agree w/ Michał that this is something that should be 
considered w/ the FLIP-27 source, if it has not already been.

[~cmick] Thanks for the research into other sources, the approach still sounds 
good to me, including the user-facing configuration option. I'm not sure if 
there is precedent for it, but we could potentially introduce the fix in 1.14 
with a reasonable timeout default (like the suggested 30s), and use the 0s 
default when backporting to 1.12 + 1.13 to maintain functionality. What do you 
think of that? Would it be more confusing to users?


was (Author: austince):
[~nicholasjiang] – yes, I agree w/ Michał that this is something that should be 
considered w/ the FLIP-27 source, if it has not already been.

[~cmick] Thanks for the research into other sources, the approach still sounds 
good to me as well as the user-facing configuration option. I'm not sure if 
there is precedent for it, but we could potentially introduce the fix in 1.14 
with a reasonable timeout default (like the suggested 30s), and use the 0s 
default when backporting to 1.12 + 1.13 to maintain functionality. What do you 
think of that? Would it be more confusing to users?

> RabbitMQ source does not stop unless message arrives in queue
> -
>
> Key: FLINK-22698
> URL: https://issues.apache.org/jira/browse/FLINK-22698
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.12.0
>Reporter: Austin Cawley-Edwards
>Assignee: Michał Ciesielczyk
>Priority: Major
> Attachments: taskmanager_thread_dump.json
>
>
> In a streaming job with multiple RMQSources, a stop-with-savepoint request 
> has unexpected behavior. Regular checkpoints and savepoints complete 
> successfully, it is only the stop-with-savepoint request where this behavior 
> is seen.
>  
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>  
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely unless 
> a message arrives in all the queues that the job consumes from after the 
> stop-with-savepoint request is made.
>  
> *Current workaround:*
> Send a sentinel value to each of the queues consumed by the job that the 
> deserialization schema checks in its isEndOfStream method. This is cumbersome 
> and makes it difficult to do stateful upgrades, as coordination with another 
> system is now necessary. 
>  
>  
> The TaskManager thread dump is attached.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue

2021-05-26 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351899#comment-17351899
 ] 

Austin Cawley-Edwards commented on FLINK-22698:
---

[~nicholasjiang] – yes, I agree w/ Michał that this is something that should be 
considered w/ the FLIP-27 source, if it has not already been.

[~cmick] Thanks for the research into other sources, the approach still sounds 
good to me as well as the user-facing configuration option. I'm not sure if 
there is precedent for it, but we could potentially introduce the fix in 1.14 
with a reasonable timeout default (like the suggested 30s), and use the 0s 
default when backporting to 1.12 + 1.13 to maintain functionality. What do you 
think of that? Would it be more confusing to users?

> RabbitMQ source does not stop unless message arrives in queue
> -
>
> Key: FLINK-22698
> URL: https://issues.apache.org/jira/browse/FLINK-22698
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.12.0
>Reporter: Austin Cawley-Edwards
>Assignee: Michał Ciesielczyk
>Priority: Major
> Attachments: taskmanager_thread_dump.json
>
>
> In a streaming job with multiple RMQSources, a stop-with-savepoint request 
> has unexpected behavior. Regular checkpoints and savepoints complete 
> successfully, it is only the stop-with-savepoint request where this behavior 
> is seen.
>  
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>  
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely unless 
> a message arrives in all the queues that the job consumes from after the 
> stop-with-savepoint request is made.
>  
> *Current workaround:*
> Send a sentinel value to each of the queues consumed by the job that the 
> deserialization schema checks in its isEndOfStream method. This is cumbersome 
> and makes it difficult to do stateful upgrades, as coordination with another 
> system is now necessary. 
>  
>  
> The TaskManager thread dump is attached.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue

2021-05-21 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards reassigned FLINK-22698:
-

Assignee: Michał Ciesielczyk

> RabbitMQ source does not stop unless message arrives in queue
> -
>
> Key: FLINK-22698
> URL: https://issues.apache.org/jira/browse/FLINK-22698
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.12.0
>Reporter: Austin Cawley-Edwards
>Assignee: Michał Ciesielczyk
>Priority: Major
> Attachments: taskmanager_thread_dump.json
>
>
> In a streaming job with multiple RMQSources, a stop-with-savepoint request 
> has unexpected behavior. Regular checkpoints and savepoints complete 
> successfully, it is only the stop-with-savepoint request where this behavior 
> is seen.
>  
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>  
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely unless 
> a message arrives in all the queues that the job consumes from after the 
> stop-with-savepoint request is made.
>  
> *Current workaround:*
> Send a sentinel value to each of the queues consumed by the job that the 
> deserialization schema checks in its isEndOfStream method. This is cumbersome 
> and makes it difficult to do stateful upgrades, as coordination with another 
> system is now necessary. 
>  
>  
> The TaskManager thread dump is attached.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue

2021-05-21 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17349300#comment-17349300
 ] 

Austin Cawley-Edwards commented on FLINK-22698:
---

Hi [~cmick] – that cause makes perfect sense to me. And the proposed solution 
also sounds good, just a couple of questions:
 * What happens when a delivery times out?
 * How do other sources handle this, i.e. Kafka? Do they wait for messages on 
another thread than where the source's `run` is called?

 

Anyway, I'll assign it to you + thanks for looking into this! Also, I'm going 
to cc' [~fabian.paul], as he's been working with this connector (+ others) and 
its FLIP-27 upgrades. He might have some other ideas.

> RabbitMQ source does not stop unless message arrives in queue
> -
>
> Key: FLINK-22698
> URL: https://issues.apache.org/jira/browse/FLINK-22698
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.12.0
>Reporter: Austin Cawley-Edwards
>Priority: Major
> Attachments: taskmanager_thread_dump.json
>
>
> In a streaming job with multiple RMQSources, a stop-with-savepoint request 
> has unexpected behavior. Regular checkpoints and savepoints complete 
> successfully, it is only the stop-with-savepoint request where this behavior 
> is seen.
>  
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>  
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely unless 
> a message arrives in all the queues that the job consumes from after the 
> stop-with-savepoint request is made.
>  
> *Current workaround:*
> Send a sentinel value to each of the queues consumed by the job that the 
> deserialization schema checks in its isEndOfStream method. This is cumbersome 
> and makes it difficult to do stateful upgrades, as coordination with another 
> system is now necessary. 
>  
>  
> The TaskManager thread dump is attached.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue

2021-05-19 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347771#comment-17347771
 ] 

Austin Cawley-Edwards edited comment on FLINK-22698 at 5/19/21, 4:48 PM:
-

Hey [~nicholasjiang] – I think that is correct. I'm coming from [this ML 
thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html]
 and am trying to get some more information from the OP.


was (Author: austince):
Hey [~nicholasjiang] – I think that is correct. I'm coming from [this ML 
thread|[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html],]]
 and am trying to get some more information from the OP.

> RabbitMQ source does not stop unless message arrives in queue
> -
>
> Key: FLINK-22698
> URL: https://issues.apache.org/jira/browse/FLINK-22698
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.12.0
>Reporter: Austin Cawley-Edwards
>Priority: Major
> Attachments: taskmanager_thread_dump.json
>
>
> In a streaming job with multiple RMQSources, a stop-with-savepoint request 
> has unexpected behavior. Regular checkpoints and savepoints complete 
> successfully, it is only the stop-with-savepoint request where this behavior 
> is seen.
>  
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>  
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely unless 
> a message arrives in all the queues that the job consumes from after the 
> stop-with-savepoint request is made.
>  
> *Current workaround:*
> Send a sentinel value to each of the queues consumed by the job that the 
> deserialization schema checks in its isEndOfStream method. This is cumbersome 
> and makes it difficult to do stateful upgrades, as coordination with another 
> system is now necessary. 
>  
>  
> The TaskManager thread dump is attached.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue

2021-05-19 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347771#comment-17347771
 ] 

Austin Cawley-Edwards commented on FLINK-22698:
---

Hey [~nicholasjiang] – I think that is correct. I'm coming from [this ML 
thread|[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html],]]
 and am trying to get some more information from the OP.

> RabbitMQ source does not stop unless message arrives in queue
> -
>
> Key: FLINK-22698
> URL: https://issues.apache.org/jira/browse/FLINK-22698
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.12.0
>Reporter: Austin Cawley-Edwards
>Priority: Major
> Attachments: taskmanager_thread_dump.json
>
>
> In a streaming job with multiple RMQSources, a stop-with-savepoint request 
> has unexpected behavior. Regular checkpoints and savepoints complete 
> successfully, it is only the stop-with-savepoint request where this behavior 
> is seen.
>  
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>  
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely unless 
> a message arrives in all the queues that the job consumes from after the 
> stop-with-savepoint request is made.
>  
> *Current workaround:*
> Send a sentinel value to each of the queues consumed by the job that the 
> deserialization schema checks in its isEndOfStream method. This is cumbersome 
> and makes it difficult to do stateful upgrades, as coordination with another 
> system is now necessary. 
>  
>  
> The TaskManager thread dump is attached.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue

2021-05-18 Thread Austin Cawley-Edwards (Jira)
Austin Cawley-Edwards created FLINK-22698:
-

 Summary: RabbitMQ source does not stop unless message arrives in 
queue
 Key: FLINK-22698
 URL: https://issues.apache.org/jira/browse/FLINK-22698
 Project: Flink
  Issue Type: Bug
  Components: Connectors/ RabbitMQ
Affects Versions: 1.12.0
Reporter: Austin Cawley-Edwards
 Attachments: taskmanager_thread_dump.json

In a streaming job with multiple RMQSources, a stop-with-savepoint request has 
unexpected behavior. Regular checkpoints and savepoints complete successfully, 
it is only the stop-with-savepoint request where this behavior is seen.
 
*Expected Behavior:*
The stop-with-savepoint request stops the job with a FINISHED state.
 
*Actual Behavior:*
The stop-with-savepoint request either times out or hangs indefinitely unless a 
message arrives in all the queues that the job consumes from after the 
stop-with-savepoint request is made.
 
*Current workaround:*
Send a sentinel value to each of the queues consumed by the job that the 
deserialization schema checks in its isEndOfStream method. This is cumbersome 
and makes it difficult to do stateful upgrades, as coordination with another 
system is now necessary. 
 
 
The TaskManager thread dump is attached.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22441) In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There are many vulnerabilities, like CVE-2021-21409 etc. please confirm these version and fix.

2021-04-26 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332718#comment-17332718
 ] 

Austin Cawley-Edwards commented on FLINK-22441:
---

Adding the "drop Scala 2.11 support" ticket as a blocker, as Konstantin 
mentioned the current Akka lib is the root of this issue, and Akka cannot be 
upgraded until Scala 2.11 is dropped. If there's a more accurate ticket for the 
Akka upgrade, we can update the blocker.

> In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There 
> are many vulnerabilities, like CVE-2021-21409 etc. please confirm these 
> version and fix. thx
> --
>
> Key: FLINK-22441
> URL: https://issues.apache.org/jira/browse/FLINK-22441
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: 张健
>Priority: Major
>
> In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There 
> are many vulnerabilities, like CVE-2021-21409 CVE-2021-21295 etc. please 
> confirm these version and fix. thx



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19711) Parsing of invalid CSV fails even with ignoreParseErrors is enabled

2021-04-18 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324629#comment-17324629
 ] 

Austin Cawley-Edwards commented on FLINK-19711:
---

I had difficulty reproducing this in tests, though believe it is still an 
issue. Not actively working on this, so if it does affect anyone and they want 
to work on it, by all means I assign me. 

> Parsing of invalid CSV fails even with ignoreParseErrors is enabled
> ---
>
> Key: FLINK-19711
> URL: https://issues.apache.org/jira/browse/FLINK-19711
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.10.2, 1.11.2, 1.12.0
>Reporter: Roman Khachatryan
>Assignee: Austin Cawley-Edwards
>Priority: Major
>  Labels: stale-assigned
>
> Reported in 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Un-ignored-Parsing-Exceptions-in-the-CsvFormat-td38781.html]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-21538) Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job

2021-04-18 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards reassigned FLINK-21538:
-

Assignee: (was: Austin Cawley-Edwards)

> Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job
> --
>
> Key: FLINK-21538
> URL: https://issues.apache.org/jira/browse/FLINK-21538
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Runtime / Coordination
>Affects Versions: 1.12.1, 1.13.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: stale-assigned, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13868=logs=3d12d40f-c62d-5ec4-6acc-0efe94cc3e89=5d6e4255-0ea8-5e2a-f52c-c881b7872361
> {code}
> 2021-02-27T00:16:06.9493539Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-02-27T00:16:06.9494494Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2021-02-27T00:16:06.9495733Z  at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> 2021-02-27T00:16:06.9496596Z  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2021-02-27T00:16:06.9497354Z  at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2021-02-27T00:16:06.9525795Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-02-27T00:16:06.9526744Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-02-27T00:16:06.9527784Z  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
> 2021-02-27T00:16:06.9528552Z  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2021-02-27T00:16:06.9529271Z  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2021-02-27T00:16:06.9530013Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-02-27T00:16:06.9530482Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-02-27T00:16:06.9531068Z  at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> 2021-02-27T00:16:06.9531544Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:264)
> 2021-02-27T00:16:06.9531908Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2021-02-27T00:16:06.9532449Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2021-02-27T00:16:06.9532860Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2021-02-27T00:16:06.9533245Z  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2021-02-27T00:16:06.9533721Z  at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> 2021-02-27T00:16:06.9534225Z  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> 2021-02-27T00:16:06.9534697Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> 2021-02-27T00:16:06.9535217Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> 2021-02-27T00:16:06.9535718Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> 2021-02-27T00:16:06.9536127Z  at 
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> 2021-02-27T00:16:06.9536861Z  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> 2021-02-27T00:16:06.9537394Z  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2021-02-27T00:16:06.9537916Z  at 
> scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> 2021-02-27T00:16:06.9605804Z  at 
> scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> 2021-02-27T00:16:06.9606794Z  at 
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> 2021-02-27T00:16:06.9607642Z  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2021-02-27T00:16:06.9608419Z  at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 2021-02-27T00:16:06.9609252Z  at 
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
> 2021-02-27T00:16:06.9610024Z  at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> 2021-02-27T00:16:06.9613676Z  at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> 2021-02-27T00:16:06.9615526Z  at 
> 

[jira] [Closed] (FLINK-22165) How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv

2021-04-08 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards closed FLINK-22165.
-
Resolution: Invalid

> How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv
> ---
>
> Key: FLINK-22165
> URL: https://issues.apache.org/jira/browse/FLINK-22165
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2
> Environment: Flink 1.12.2
> rabbitmq 3.8.4
>Reporter: Spongebob
>Priority: Minor
>
> Flink rabbitmq module provides source and sink function for rabbitmq. We can 
> use the correlationId to deduplicate the checkpoints record, So can we set a 
> correlationId for each message to sink into rabbitmq ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22165) How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv

2021-04-08 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17317576#comment-17317576
 ] 

Austin Cawley-Edwards commented on FLINK-22165:
---

Hi there! Please send this as a question to u...@flink.apache.org and I'm happy 
to help :)

> How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv
> ---
>
> Key: FLINK-22165
> URL: https://issues.apache.org/jira/browse/FLINK-22165
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2
> Environment: Flink 1.12.2
> rabbitmq 3.8.4
>Reporter: Spongebob
>Priority: Minor
>
> Flink rabbitmq module provides source and sink function for rabbitmq. We can 
> use the correlationId to deduplicate the checkpoints record, So can we set a 
> correlationId for each message to sink into rabbitmq ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17859) [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version

2021-04-06 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-17859:
--
Fix Version/s: 1.8.0

> [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version
> 
>
> Key: FLINK-17859
> URL: https://issues.apache.org/jira/browse/FLINK-17859
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem, Connectors / Hive
>Affects Versions: 1.7.2
>Reporter: sam lin
>Priority: Major
> Fix For: 1.8.0
>
>
> The current version of presto-hive is 0.187.[ 
> |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
>  
> [https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
>  [
> |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
> The latest version is 0.234. [https://github.com/prestodb/presto/releases
> ]
> There are some nice features we want to use after 0.187.  One of them is the 
> CredentialProviderChain support when using AWS S3 client added in this 
> [pr]([https://github.com/prestodb/presto/pull/13858]) 
> Do you have any concerns to upgrade the `presto-hive` to the latest version?  
> Could you please upgrade it in the latest release?  Thanks.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17859) [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version

2021-04-06 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-17859:
--
Affects Version/s: 1.7.2

> [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version
> 
>
> Key: FLINK-17859
> URL: https://issues.apache.org/jira/browse/FLINK-17859
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem, Connectors / Hive
>Affects Versions: 1.7.2
>Reporter: sam lin
>Priority: Major
>
> The current version of presto-hive is 0.187.[ 
> |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
>  
> [https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
>  [
> |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
> The latest version is 0.234. [https://github.com/prestodb/presto/releases
> ]
> There are some nice features we want to use after 0.187.  One of them is the 
> CredentialProviderChain support when using AWS S3 client added in this 
> [pr]([https://github.com/prestodb/presto/pull/13858]) 
> Do you have any concerns to upgrade the `presto-hive` to the latest version?  
> Could you please upgrade it in the latest release?  Thanks.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17859) [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version

2021-04-06 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17315886#comment-17315886
 ] 

Austin Cawley-Edwards commented on FLINK-17859:
---

This has been updated as of Flink 1.8: 
[https://github.com/apache/flink/blob/release-1.8/flink-filesystems/flink-s3-fs-presto/pom.xml#L36|https://github.com/apache/flink/blob/9ff3b0ee294222007c6a6af57541c48854f6645a/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]

 

Ticket should be closed.

> [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version
> 
>
> Key: FLINK-17859
> URL: https://issues.apache.org/jira/browse/FLINK-17859
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem, Connectors / Hive
>Reporter: sam lin
>Priority: Major
>
> The current version of presto-hive is 0.187.[ 
> |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
>  
> [https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
>  [
> |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
> The latest version is 0.234. [https://github.com/prestodb/presto/releases
> ]
> There are some nice features we want to use after 0.187.  One of them is the 
> CredentialProviderChain support when using AWS S3 client added in this 
> [pr]([https://github.com/prestodb/presto/pull/13858]) 
> Do you have any concerns to upgrade the `presto-hive` to the latest version?  
> Could you please upgrade it in the latest release?  Thanks.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22084) RescalingITCase fails with adaptive scheduler

2021-04-06 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17315716#comment-17315716
 ] 

Austin Cawley-Edwards commented on FLINK-22084:
---

That is unfortunate, and yes I agree that it seems like it should not be 
supported. I guess this was supported before by allowing autoconfigured max 
parallelism to be overridden by the state restore.

I believe this is broken because we first compute the 
"initialParallelismStore", which defaults/ records if it has been 
autoconfigured, and then use that to create an "adjusted" parallelism store 
when creating the execution graph *without transferring autoconfiguration 
details*, thus hiding if a vertex's max parallelism can be rescaled by the 
state.

> RescalingITCase fails with adaptive scheduler
> -
>
> Key: FLINK-22084
> URL: https://issues.apache.org/jira/browse/FLINK-22084
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Assignee: Austin Cawley-Edwards
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15934=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=a0a633b8-47ef-5c5a-2806-3c13b9e48228=4472
> {code}
> 2021-03-31T22:16:07.8416407Z [ERROR] 
> testSavepointRescalingOutKeyedStateDerivedMaxParallelism[backend = 
> rocksdb](org.apache.flink.test.checkpointing.RescalingITCase)  Time elapsed: 
> 9.945 s  <<< ERROR!
> 2021-03-31T22:16:07.8417534Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-03-31T22:16:07.8418516Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2021-03-31T22:16:07.8419281Z  at 
> org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:63)
> 2021-03-31T22:16:07.8420142Z  at 
> org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingKeyedState(RescalingITCase.java:251)
> 2021-03-31T22:16:07.8421173Z  at 
> org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingOutKeyedStateDerivedMaxParallelism(RescalingITCase.java:168)
> 2021-03-31T22:16:07.8421985Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-03-31T22:16:07.8422651Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-03-31T22:16:07.8423649Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-03-31T22:16:07.8424231Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-03-31T22:16:07.8424657Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-03-31T22:16:07.8425147Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-03-31T22:16:07.8425609Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-03-31T22:16:07.8426183Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-03-31T22:16:07.8569060Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-03-31T22:16:07.8569781Z  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-03-31T22:16:07.8570451Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2021-03-31T22:16:07.8571040Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2021-03-31T22:16:07.8571604Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2021-03-31T22:16:07.8572303Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2021-03-31T22:16:07.8573259Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2021-03-31T22:16:07.8573975Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-03-31T22:16:07.8574660Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-03-31T22:16:07.8575359Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-03-31T22:16:07.8576037Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-03-31T22:16:07.8576728Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-03-31T22:16:07.8577588Z  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2021-03-31T22:16:07.8578181Z  at 
> org.junit.runners.Suite.runChild(Suite.java:128)
> 2021-03-31T22:16:07.8578771Z  at 
> org.junit.runners.Suite.runChild(Suite.java:27)
> 2021-03-31T22:16:07.8579402Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 

[jira] [Commented] (FLINK-22084) RescalingITCase fails with adaptive scheduler

2021-04-06 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17315603#comment-17315603
 ] 

Austin Cawley-Edwards commented on FLINK-22084:
---

[~trohrmann] very likely – I'll take this.

> RescalingITCase fails with adaptive scheduler
> -
>
> Key: FLINK-22084
> URL: https://issues.apache.org/jira/browse/FLINK-22084
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15934=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=a0a633b8-47ef-5c5a-2806-3c13b9e48228=4472
> {code}
> 2021-03-31T22:16:07.8416407Z [ERROR] 
> testSavepointRescalingOutKeyedStateDerivedMaxParallelism[backend = 
> rocksdb](org.apache.flink.test.checkpointing.RescalingITCase)  Time elapsed: 
> 9.945 s  <<< ERROR!
> 2021-03-31T22:16:07.8417534Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-03-31T22:16:07.8418516Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2021-03-31T22:16:07.8419281Z  at 
> org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:63)
> 2021-03-31T22:16:07.8420142Z  at 
> org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingKeyedState(RescalingITCase.java:251)
> 2021-03-31T22:16:07.8421173Z  at 
> org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingOutKeyedStateDerivedMaxParallelism(RescalingITCase.java:168)
> 2021-03-31T22:16:07.8421985Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-03-31T22:16:07.8422651Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-03-31T22:16:07.8423649Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-03-31T22:16:07.8424231Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-03-31T22:16:07.8424657Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-03-31T22:16:07.8425147Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-03-31T22:16:07.8425609Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-03-31T22:16:07.8426183Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-03-31T22:16:07.8569060Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-03-31T22:16:07.8569781Z  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-03-31T22:16:07.8570451Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2021-03-31T22:16:07.8571040Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2021-03-31T22:16:07.8571604Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2021-03-31T22:16:07.8572303Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2021-03-31T22:16:07.8573259Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2021-03-31T22:16:07.8573975Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-03-31T22:16:07.8574660Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-03-31T22:16:07.8575359Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-03-31T22:16:07.8576037Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-03-31T22:16:07.8576728Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-03-31T22:16:07.8577588Z  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2021-03-31T22:16:07.8578181Z  at 
> org.junit.runners.Suite.runChild(Suite.java:128)
> 2021-03-31T22:16:07.8578771Z  at 
> org.junit.runners.Suite.runChild(Suite.java:27)
> 2021-03-31T22:16:07.8579402Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-03-31T22:16:07.8580061Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-03-31T22:16:07.8580774Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-03-31T22:16:07.8581480Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-03-31T22:16:07.8582148Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-03-31T22:16:07.8582896Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-03-31T22:16:07.8583762Z  at 
> 

[jira] [Assigned] (FLINK-22084) RescalingITCase fails with adaptive scheduler

2021-04-06 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards reassigned FLINK-22084:
-

Assignee: Austin Cawley-Edwards

> RescalingITCase fails with adaptive scheduler
> -
>
> Key: FLINK-22084
> URL: https://issues.apache.org/jira/browse/FLINK-22084
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Assignee: Austin Cawley-Edwards
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15934=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=a0a633b8-47ef-5c5a-2806-3c13b9e48228=4472
> {code}
> 2021-03-31T22:16:07.8416407Z [ERROR] 
> testSavepointRescalingOutKeyedStateDerivedMaxParallelism[backend = 
> rocksdb](org.apache.flink.test.checkpointing.RescalingITCase)  Time elapsed: 
> 9.945 s  <<< ERROR!
> 2021-03-31T22:16:07.8417534Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-03-31T22:16:07.8418516Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2021-03-31T22:16:07.8419281Z  at 
> org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:63)
> 2021-03-31T22:16:07.8420142Z  at 
> org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingKeyedState(RescalingITCase.java:251)
> 2021-03-31T22:16:07.8421173Z  at 
> org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingOutKeyedStateDerivedMaxParallelism(RescalingITCase.java:168)
> 2021-03-31T22:16:07.8421985Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-03-31T22:16:07.8422651Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-03-31T22:16:07.8423649Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-03-31T22:16:07.8424231Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-03-31T22:16:07.8424657Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-03-31T22:16:07.8425147Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-03-31T22:16:07.8425609Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-03-31T22:16:07.8426183Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-03-31T22:16:07.8569060Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-03-31T22:16:07.8569781Z  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-03-31T22:16:07.8570451Z  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2021-03-31T22:16:07.8571040Z  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2021-03-31T22:16:07.8571604Z  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2021-03-31T22:16:07.8572303Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2021-03-31T22:16:07.8573259Z  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2021-03-31T22:16:07.8573975Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-03-31T22:16:07.8574660Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-03-31T22:16:07.8575359Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-03-31T22:16:07.8576037Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-03-31T22:16:07.8576728Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-03-31T22:16:07.8577588Z  at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2021-03-31T22:16:07.8578181Z  at 
> org.junit.runners.Suite.runChild(Suite.java:128)
> 2021-03-31T22:16:07.8578771Z  at 
> org.junit.runners.Suite.runChild(Suite.java:27)
> 2021-03-31T22:16:07.8579402Z  at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-03-31T22:16:07.8580061Z  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-03-31T22:16:07.8580774Z  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-03-31T22:16:07.8581480Z  at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-03-31T22:16:07.8582148Z  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-03-31T22:16:07.8582896Z  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-03-31T22:16:07.8583762Z  at 
> 

[jira] [Commented] (FLINK-21844) Do not auto-configure maxParallelism when setting "scheduler-mode: reactive"

2021-03-19 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305213#comment-17305213
 ] 

Austin Cawley-Edwards commented on FLINK-21844:
---

Does this mean a user must configure `maxParallelism` for all operators in 
reactive mode, otherwise Flink will produce an exception when building the 
`ExecutionGraph`?

> Do not auto-configure maxParallelism when setting "scheduler-mode: reactive"
> 
>
> Key: FLINK-21844
> URL: https://issues.apache.org/jira/browse/FLINK-21844
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Konstantin Knauf
>Assignee: Austin Cawley-Edwards
>Priority: Major
> Fix For: 1.13.0
>
>
> I believe we should not automatically change the maxParallelism when the 
>  "scheduler-mode" is set to "reactive", because:
>  * it magically breaks savepoint compatibility, when you switch between 
> default and reactive scheduler mode
>  * the maximum parallelism is an orthogonal concern that in my opinion should 
> not be mixed with the scheduler mode. The reactive scheduler should respect 
> the maxParallelism, but it should not set/ change its default value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21844) Do not auto-configure maxParallelism when setting "scheduler-mode: reactive"

2021-03-19 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-21844:
--
Description: 
I believe we should not automatically change the maxParallelism when the 
 "scheduler-mode" is set to "reactive", because:
 * it magically breaks savepoint compatibility, when you switch between default 
and reactive scheduler mode
 * the maximum parallelism is an orthogonal concern that in my opinion should 
not be mixed with the scheduler mode. The reactive scheduler should respect the 
maxParallelism, but it should not set/ change its default value.

  was:
I believe we should not automatically change the maxParallelism when the 
"scheduler-mode" is set to "reactive", because:

* it magically breaks savepoint compatibility, when you switch between default 
and reactive scheduler mode
* the maximum parallelism is an orthogonal concern that in my opinion should 
not be mixed with the scheduler mode. The reactive scheduler should respect the 
maxParallelism, but it should not set change its default value.
 


> Do not auto-configure maxParallelism when setting "scheduler-mode: reactive"
> 
>
> Key: FLINK-21844
> URL: https://issues.apache.org/jira/browse/FLINK-21844
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Konstantin Knauf
>Assignee: Austin Cawley-Edwards
>Priority: Major
> Fix For: 1.13.0
>
>
> I believe we should not automatically change the maxParallelism when the 
>  "scheduler-mode" is set to "reactive", because:
>  * it magically breaks savepoint compatibility, when you switch between 
> default and reactive scheduler mode
>  * the maximum parallelism is an orthogonal concern that in my opinion should 
> not be mixed with the scheduler mode. The reactive scheduler should respect 
> the maxParallelism, but it should not set/ change its default value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-21844) Do not auto-configure maxParallelism when setting "scheduler-mode: reactive"

2021-03-19 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards reassigned FLINK-21844:
-

Assignee: Austin Cawley-Edwards

> Do not auto-configure maxParallelism when setting "scheduler-mode: reactive"
> 
>
> Key: FLINK-21844
> URL: https://issues.apache.org/jira/browse/FLINK-21844
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Konstantin Knauf
>Assignee: Austin Cawley-Edwards
>Priority: Major
> Fix For: 1.13.0
>
>
> I believe we should not automatically change the maxParallelism when the 
> "scheduler-mode" is set to "reactive", because:
> * it magically breaks savepoint compatibility, when you switch between 
> default and reactive scheduler mode
> * the maximum parallelism is an orthogonal concern that in my opinion should 
> not be mixed with the scheduler mode. The reactive scheduler should respect 
> the maxParallelism, but it should not set change its default value.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-21538) Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job

2021-03-15 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300636#comment-17300636
 ] 

Austin Cawley-Edwards edited comment on FLINK-21538 at 3/15/21, 4:13 PM:
-

Test debug build here: 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=14715=results]


was (Author: austince):
Test debug build here: 
https://dev.azure.com/austincawley0684/flink/_build/results?buildId=3=results

> Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job
> --
>
> Key: FLINK-21538
> URL: https://issues.apache.org/jira/browse/FLINK-21538
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Runtime / Coordination
>Affects Versions: 1.12.1
>Reporter: Dawid Wysakowicz
>Assignee: Austin Cawley-Edwards
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13868=logs=3d12d40f-c62d-5ec4-6acc-0efe94cc3e89=5d6e4255-0ea8-5e2a-f52c-c881b7872361
> {code}
> 2021-02-27T00:16:06.9493539Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-02-27T00:16:06.9494494Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2021-02-27T00:16:06.9495733Z  at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> 2021-02-27T00:16:06.9496596Z  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2021-02-27T00:16:06.9497354Z  at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2021-02-27T00:16:06.9525795Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-02-27T00:16:06.9526744Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-02-27T00:16:06.9527784Z  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
> 2021-02-27T00:16:06.9528552Z  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2021-02-27T00:16:06.9529271Z  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2021-02-27T00:16:06.9530013Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-02-27T00:16:06.9530482Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-02-27T00:16:06.9531068Z  at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> 2021-02-27T00:16:06.9531544Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:264)
> 2021-02-27T00:16:06.9531908Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2021-02-27T00:16:06.9532449Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2021-02-27T00:16:06.9532860Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2021-02-27T00:16:06.9533245Z  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2021-02-27T00:16:06.9533721Z  at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> 2021-02-27T00:16:06.9534225Z  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> 2021-02-27T00:16:06.9534697Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> 2021-02-27T00:16:06.9535217Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> 2021-02-27T00:16:06.9535718Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> 2021-02-27T00:16:06.9536127Z  at 
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> 2021-02-27T00:16:06.9536861Z  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> 2021-02-27T00:16:06.9537394Z  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2021-02-27T00:16:06.9537916Z  at 
> scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> 2021-02-27T00:16:06.9605804Z  at 
> scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> 2021-02-27T00:16:06.9606794Z  at 
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> 2021-02-27T00:16:06.9607642Z  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2021-02-27T00:16:06.9608419Z  at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 2021-02-27T00:16:06.9609252Z  at 
> 

[jira] [Assigned] (FLINK-21538) Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job

2021-03-15 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards reassigned FLINK-21538:
-

Assignee: Austin Cawley-Edwards

> Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job
> --
>
> Key: FLINK-21538
> URL: https://issues.apache.org/jira/browse/FLINK-21538
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Runtime / Coordination
>Affects Versions: 1.12.1
>Reporter: Dawid Wysakowicz
>Assignee: Austin Cawley-Edwards
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13868=logs=3d12d40f-c62d-5ec4-6acc-0efe94cc3e89=5d6e4255-0ea8-5e2a-f52c-c881b7872361
> {code}
> 2021-02-27T00:16:06.9493539Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-02-27T00:16:06.9494494Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2021-02-27T00:16:06.9495733Z  at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> 2021-02-27T00:16:06.9496596Z  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2021-02-27T00:16:06.9497354Z  at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2021-02-27T00:16:06.9525795Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-02-27T00:16:06.9526744Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-02-27T00:16:06.9527784Z  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
> 2021-02-27T00:16:06.9528552Z  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2021-02-27T00:16:06.9529271Z  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2021-02-27T00:16:06.9530013Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-02-27T00:16:06.9530482Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-02-27T00:16:06.9531068Z  at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> 2021-02-27T00:16:06.9531544Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:264)
> 2021-02-27T00:16:06.9531908Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2021-02-27T00:16:06.9532449Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2021-02-27T00:16:06.9532860Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2021-02-27T00:16:06.9533245Z  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2021-02-27T00:16:06.9533721Z  at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> 2021-02-27T00:16:06.9534225Z  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> 2021-02-27T00:16:06.9534697Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> 2021-02-27T00:16:06.9535217Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> 2021-02-27T00:16:06.9535718Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> 2021-02-27T00:16:06.9536127Z  at 
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> 2021-02-27T00:16:06.9536861Z  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> 2021-02-27T00:16:06.9537394Z  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2021-02-27T00:16:06.9537916Z  at 
> scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> 2021-02-27T00:16:06.9605804Z  at 
> scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> 2021-02-27T00:16:06.9606794Z  at 
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> 2021-02-27T00:16:06.9607642Z  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2021-02-27T00:16:06.9608419Z  at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 2021-02-27T00:16:06.9609252Z  at 
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
> 2021-02-27T00:16:06.9610024Z  at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> 2021-02-27T00:16:06.9613676Z  at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> 2021-02-27T00:16:06.9615526Z  at 
> 

[jira] [Commented] (FLINK-21538) Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job

2021-03-12 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300636#comment-17300636
 ] 

Austin Cawley-Edwards commented on FLINK-21538:
---

Test debug build here: 
https://dev.azure.com/austincawley0684/flink/_build/results?buildId=3=results

> Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job
> --
>
> Key: FLINK-21538
> URL: https://issues.apache.org/jira/browse/FLINK-21538
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch, Runtime / Coordination
>Affects Versions: 1.12.1
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13868=logs=3d12d40f-c62d-5ec4-6acc-0efe94cc3e89=5d6e4255-0ea8-5e2a-f52c-c881b7872361
> {code}
> 2021-02-27T00:16:06.9493539Z 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 2021-02-27T00:16:06.9494494Z  at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 2021-02-27T00:16:06.9495733Z  at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> 2021-02-27T00:16:06.9496596Z  at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2021-02-27T00:16:06.9497354Z  at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 2021-02-27T00:16:06.9525795Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-02-27T00:16:06.9526744Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-02-27T00:16:06.9527784Z  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
> 2021-02-27T00:16:06.9528552Z  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 2021-02-27T00:16:06.9529271Z  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 2021-02-27T00:16:06.9530013Z  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 2021-02-27T00:16:06.9530482Z  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> 2021-02-27T00:16:06.9531068Z  at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> 2021-02-27T00:16:06.9531544Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:264)
> 2021-02-27T00:16:06.9531908Z  at 
> akka.dispatch.OnComplete.internal(Future.scala:261)
> 2021-02-27T00:16:06.9532449Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 2021-02-27T00:16:06.9532860Z  at 
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 2021-02-27T00:16:06.9533245Z  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2021-02-27T00:16:06.9533721Z  at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> 2021-02-27T00:16:06.9534225Z  at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> 2021-02-27T00:16:06.9534697Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> 2021-02-27T00:16:06.9535217Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> 2021-02-27T00:16:06.9535718Z  at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> 2021-02-27T00:16:06.9536127Z  at 
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> 2021-02-27T00:16:06.9536861Z  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> 2021-02-27T00:16:06.9537394Z  at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 2021-02-27T00:16:06.9537916Z  at 
> scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> 2021-02-27T00:16:06.9605804Z  at 
> scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> 2021-02-27T00:16:06.9606794Z  at 
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> 2021-02-27T00:16:06.9607642Z  at 
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> 2021-02-27T00:16:06.9608419Z  at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 2021-02-27T00:16:06.9609252Z  at 
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
> 2021-02-27T00:16:06.9610024Z  at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> 2021-02-27T00:16:06.9613676Z  at 
> 

[jira] [Commented] (FLINK-18755) RabbitMQ QoS Chinese Documentation

2020-07-30 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167847#comment-17167847
 ] 

Austin Cawley-Edwards commented on FLINK-18755:
---

Thank you so much! I don't have that ability but maybe [~dwysakowicz] or 
[~aljoscha] might be able to when they have time?

> RabbitMQ QoS Chinese Documentation
> --
>
> Key: FLINK-18755
> URL: https://issues.apache.org/jira/browse/FLINK-18755
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Connectors/ RabbitMQ
>Affects Versions: 1.12.0
>Reporter: Austin Cawley-Edwards
>Priority: Major
>
> Please add documentation for the new QoS settings in the RabbitMQ connector. 
> The added English documentation can be found in the PR here: 
> [https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167|https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18755) RabbitMQ QoS Chinese Documentation

2020-07-29 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-18755:
--
Description: 
Please add documentation for the new QoS settings in the RabbitMQ connector. 
The added English documentation can be found in the PR here: 
[https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167|https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.]

 

 

  was:
Please add documentation for the new QoS settings in the RabbitMQ connector. 
The added English documentation can be found in the PR here: 
[https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.]

 

 


> RabbitMQ QoS Chinese Documentation
> --
>
> Key: FLINK-18755
> URL: https://issues.apache.org/jira/browse/FLINK-18755
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Connectors/ RabbitMQ
>Affects Versions: 1.12.0
>Reporter: Austin Cawley-Edwards
>Priority: Major
>
> Please add documentation for the new QoS settings in the RabbitMQ connector. 
> The added English documentation can be found in the PR here: 
> [https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167|https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18755) RabbitMQ QoS Chinese Documentation

2020-07-29 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-18755:
--
Issue Type: Improvement  (was: Bug)

> RabbitMQ QoS Chinese Documentation
> --
>
> Key: FLINK-18755
> URL: https://issues.apache.org/jira/browse/FLINK-18755
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Connectors/ RabbitMQ
>Affects Versions: 1.12.0
>Reporter: Austin Cawley-Edwards
>Priority: Major
>
> Please add documentation for the new QoS settings in the RabbitMQ connector. 
> The added English documentation can be found in the PR here: 
> [https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18755) RabbitMQ QoS Chinese Documentation

2020-07-29 Thread Austin Cawley-Edwards (Jira)
Austin Cawley-Edwards created FLINK-18755:
-

 Summary: RabbitMQ QoS Chinese Documentation
 Key: FLINK-18755
 URL: https://issues.apache.org/jira/browse/FLINK-18755
 Project: Flink
  Issue Type: Bug
  Components: chinese-translation, Connectors/ RabbitMQ
Affects Versions: 1.12.0
Reporter: Austin Cawley-Edwards


Please add documentation for the new QoS settings in the RabbitMQ connector. 
The added English documentation can be found in the PR here: 
[https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.]

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-17559) Backpressure seems to be broken when not going through network

2020-07-29 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-17559:
--
Comment: was deleted

(was: I think we can close this as well [~dwysakowicz], if you don't mind.)

> Backpressure seems to be broken when not going through network
> --
>
> Key: FLINK-17559
> URL: https://issues.apache.org/jira/browse/FLINK-17559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.8.2
>Reporter: Luis
>Priority: Major
> Attachments: Screenshot from 2020-05-07 13-31-23.png
>
>
> Back pressure for Flink seems broken. Someone please correct me, from what I 
> understand it only works between network transfers.  If I have a source with 
> no thread sleep then there is no back pressure some operation will accumulate 
> data and crash.  I even tried removing chaining with
> env.disableOperatorChaining()
>  and it works with parallelism set to 1, but with 3 or 4 crashes. See below. 
>  
> From this I can conclude if I have any map function that produces more output 
> that is coming in it will eventually crash if there is no network dividing 
> them to allow for backpressure. Is this correct?
>  
>  
> {code:java}
> java.lang.OutOfMemoryError: Java heap space
> 2020-05-07 18:27:37,942 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
> 'flink-scheduler-1' produced an uncaught exception. Stopping the process...
> java.lang.OutOfMemoryError: Java heap space
> at akka.dispatch.AbstractNodeQueue.(AbstractNodeQueue.java:32)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskQueue.(LightArrayRevolverScheduler.scala:305)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:270)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
> 2020-05-07 18:27:35,725 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
> 'flink-metrics-8' produced an uncaught exception. Stopping the process...
> java.lang.OutOfMemoryError: Java heap space
> 2020-05-07 18:27:35,725 ERROR 
> com.rabbitmq.client.impl.ForgivingExceptionHandler- An unexpected 
> connection driver error occured
> java.lang.OutOfMemoryError: Java heap space
> at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:120)
> at 
> com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
> at 
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> [https://stackoverflow.com/questions/61465789/how-do-i-create-a-flink-richparallelsourcefunction-with-backpressure]
>  
>  
>  It seems that I am suppose guess how much my sink can handle and throttle to 
> that amount in my source generator. But that always puts my system of a risk 
> of crashing. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17559) Backpressure seems to be broken when not going through network

2020-07-29 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167462#comment-17167462
 ] 

Austin Cawley-Edwards commented on FLINK-17559:
---

I think we can close this as well [~dwysakowicz], if you don't mind.

> Backpressure seems to be broken when not going through network
> --
>
> Key: FLINK-17559
> URL: https://issues.apache.org/jira/browse/FLINK-17559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.8.2
>Reporter: Luis
>Priority: Major
> Attachments: Screenshot from 2020-05-07 13-31-23.png
>
>
> Back pressure for Flink seems broken. Someone please correct me, from what I 
> understand it only works between network transfers.  If I have a source with 
> no thread sleep then there is no back pressure some operation will accumulate 
> data and crash.  I even tried removing chaining with
> env.disableOperatorChaining()
>  and it works with parallelism set to 1, but with 3 or 4 crashes. See below. 
>  
> From this I can conclude if I have any map function that produces more output 
> that is coming in it will eventually crash if there is no network dividing 
> them to allow for backpressure. Is this correct?
>  
>  
> {code:java}
> java.lang.OutOfMemoryError: Java heap space
> 2020-05-07 18:27:37,942 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
> 'flink-scheduler-1' produced an uncaught exception. Stopping the process...
> java.lang.OutOfMemoryError: Java heap space
> at akka.dispatch.AbstractNodeQueue.(AbstractNodeQueue.java:32)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskQueue.(LightArrayRevolverScheduler.scala:305)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:270)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
> 2020-05-07 18:27:35,725 ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
> 'flink-metrics-8' produced an uncaught exception. Stopping the process...
> java.lang.OutOfMemoryError: Java heap space
> 2020-05-07 18:27:35,725 ERROR 
> com.rabbitmq.client.impl.ForgivingExceptionHandler- An unexpected 
> connection driver error occured
> java.lang.OutOfMemoryError: Java heap space
> at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:120)
> at 
> com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
> at 
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> [https://stackoverflow.com/questions/61465789/how-do-i-create-a-flink-richparallelsourcefunction-with-backpressure]
>  
>  
>  It seems that I am suppose guess how much my sink can handle and throttle to 
> that amount in my source generator. But that always puts my system of a risk 
> of crashing. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-07-21 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162230#comment-17162230
 ] 

Austin Cawley-Edwards edited comment on FLINK-17529 at 7/21/20, 6:01 PM:
-

Making this ticket "Upgrade com.rabbitmq:amqp-client to 5.x" as there's a CVE 
for the 4.2.0 version we're using: 
[https://github.com/advisories/GHSA-w4g2-9hj6-5472]

 

Should be an easy upgrade after [https://github.com/apache/flink/pull/12729] is 
merged


was (Author: austince):
Perhaps we should make this ticket "Upgrade com.rabbitmq:amqp-client to 5.x" as 
there's a CVE for the 4.2.0 version we're using: 
[https://github.com/advisories/GHSA-w4g2-9hj6-5472]

 

Should be an easy upgrade after [https://github.com/apache/flink/pull/12729] is 
merged

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17529) Upgrade com.rabbitmq:amqp-client to latest 5.x

2020-07-21 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-17529:
--
Summary: Upgrade com.rabbitmq:amqp-client to latest 5.x  (was: Upgrade 
com.rabbitmq:amqp-client to 5.x)

> Upgrade com.rabbitmq:amqp-client to latest 5.x
> --
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Major
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17529) Upgrade com.rabbitmq:amqp-client to 5.x

2020-07-21 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-17529:
--
Summary: Upgrade com.rabbitmq:amqp-client to 5.x  (was: Replace Deprecated 
RMQ QueueingConsumer)

> Upgrade com.rabbitmq:amqp-client to 5.x
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-07-21 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162230#comment-17162230
 ] 

Austin Cawley-Edwards commented on FLINK-17529:
---

Perhaps we should make this ticket "Upgrade com.rabbitmq:amqp-client to 5.x" as 
there's a CVE for the 4.2.0 version we're using: 
[https://github.com/advisories/GHSA-w4g2-9hj6-5472]

 

Should be an easy upgrade after [https://github.com/apache/flink/pull/12729] is 
merged

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17529) Upgrade com.rabbitmq:amqp-client to 5.x

2020-07-21 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-17529:
--
Priority: Major  (was: Minor)

> Upgrade com.rabbitmq:amqp-client to 5.x
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Major
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-12 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17134241#comment-17134241
 ] 

Austin Cawley-Edwards commented on FLINK-10195:
---

[~aljoscha] that would be great, thank you. [~lukaj], I would love feedback 
from you when I finish a first cut if that sounds good to you.

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-04 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125362#comment-17125362
 ] 

Austin Cawley-Edwards edited comment on FLINK-10195 at 6/4/20, 2:50 PM:


Yes, that is correct, though I think it is still an improvement on what's here 
and allows the users to tune their job according to their needs, but I don't 
think the performance issue that you cite can be fixed with what Rabbit 
provides.

EDIT: we might be able to handle this by updating prefetch counts dynamically 
if the buffer has space and there are many unacked messages waiting to be acked 
on checkpoint, though I think that might be too much for an initial 
implementation.

 

If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ 
AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and 
checkpointing interval. This could also be an opt-in/ opt-out change if there 
are cases that need it disabled, and we should definitely update the docs[1].

Our company still actively uses Rabbit – I'm happy to build off your PR and 
test it out in our jobs.

 

 

 

[1]: 
[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source]


was (Author: austince):
Yes, that is correct, though I think it is still an improvement on what's here 
and allows the users to tune their job according to their needs, but I don't 
think the performance issue that you cite can be fixed with what Rabbit 
provides. 

EDIT: we might be able to handle this by updating prefetch counts dynamically 
if the buffer has space and there are many unacked messages waiting to be acked 
on checkpoint, though I think that might be too much for an initial 
implementation.

 

If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ 
AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and 
checkpointing interval. This could also be an opt-in/ opt-out change if there 
are cases that need it disabled, and we should definitely update the docs.

Our company still actively uses Rabbit – I'm happy to build off your PR and 
test it out in our jobs.

 

 

 

[1]: 
[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source]

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-03 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125362#comment-17125362
 ] 

Austin Cawley-Edwards edited comment on FLINK-10195 at 6/3/20, 11:15 PM:
-

Yes, that is correct, though I think it is still an improvement on what's here 
and allows the users to tune their job according to their needs, but I don't 
think the performance issue that you cite can be fixed with what Rabbit 
provides. 

EDIT: we might be able to handle this by updating prefetch counts dynamically 
if the buffer has space and there are many unacked messages waiting to be acked 
on checkpoint, though I think that might be too much for an initial 
implementation.

 

If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ 
AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and 
checkpointing interval. This could also be an opt-in/ opt-out change if there 
are cases that need it disabled, and we should definitely update the docs.

Our company still actively uses Rabbit – I'm happy to build off your PR and 
test it out in our jobs.

 

 

 

[1]: 
[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source]


was (Author: austince):
Yes, that is correct, though I think it is still an improvement on what's here 
and allows the users to tune their job according to their needs, but I don't 
think the performance issue that you cite can be fixed with what Rabbit 
provides.

If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ 
AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and 
checkpointing interval. This could also be an opt-in/ opt-out change if there 
are cases that need it disabled, and we should definitely update the docs.

Our company still actively uses Rabbit – I'm happy to build off your PR and 
test it out in our jobs.

 

 

 

[1]: 
[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source]

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-03 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227
 ] 

Austin Cawley-Edwards edited comment on FLINK-10195 at 6/3/20, 10:29 PM:
-

Hey all, just bringing this back up as I'm trying to figure out if we could 
replace the consumer another way, but not looking like it. The only way I've 
found client-side flow control to work with RabbitMQ is through setting the 
prefetch count on the channel. I think that in combination with the work done 
by Luka, we might a decent solution where we could adjust the [channel's 
prefetch count|https://www.rabbitmq.com/confirms.html] based on the length of 
the blocking queue, which can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to 
implement. I've put together a small prototype/ playground in Node here 
([https://github.com/austince/backpressured-consumer-prototype]), along with 
some potential issues. The simplest implementation would be a static prefetch 
count, but if we want to tune that and have it update depending on the space 
left in the buffer I think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would 
handle the following tickets as well:

{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to 
oom
 {color:#cc7832}- FLINK-17559 {color}Duplicate of 
{color:#808080}FLINK-6885{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 


was (Author: austince):
Hey all, just bringing this back up as I'm trying to figure out if we could 
replace the consumer another way, but not looking like it. The only way I've 
found client-side flow control to work with RabbitMQ is through setting the 
prefetch count on the channel. I think that in combination with the work done 
by Luka, we might a decent solution where we could adjust the [channel's 
prefetch count|#basicQos(int)] based on the length of the blocking queue, which 
can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to 
implement. I've put together a small prototype/ playground in Node here 
([https://github.com/austince/backpressured-consumer-prototype]), along with 
some potential issues. The simplest implementation would be a static prefetch 
count, but if we want to tune that and have it update depending on the space 
left in the buffer I think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would 
handle the following tickets as well:

{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to 
oom
 {color:#cc7832}- FLINK-17559 {color}Duplicate of 
{color:#808080}FLINK-6885{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-03 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125362#comment-17125362
 ] 

Austin Cawley-Edwards commented on FLINK-10195:
---

Yes, that is correct, though I think it is still an improvement on what's here 
and allows the users to tune their job according to their needs, but I don't 
think the performance issue that you cite can be fixed with what Rabbit 
provides.

If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ 
AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and 
checkpointing interval. This could also be an opt-in change if there are cases 
that need it disabled, and we should definitely update the docs.

Our company still actively uses Rabbit -- I'm happy to build off your PR and 
test it out in our jobs.

 

 

 

[1]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-03 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125362#comment-17125362
 ] 

Austin Cawley-Edwards edited comment on FLINK-10195 at 6/3/20, 10:27 PM:
-

Yes, that is correct, though I think it is still an improvement on what's here 
and allows the users to tune their job according to their needs, but I don't 
think the performance issue that you cite can be fixed with what Rabbit 
provides.

If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ 
AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and 
checkpointing interval. This could also be an opt-in/ opt-out change if there 
are cases that need it disabled, and we should definitely update the docs.

Our company still actively uses Rabbit – I'm happy to build off your PR and 
test it out in our jobs.

 

 

 

[1]: 
[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source]


was (Author: austince):
Yes, that is correct, though I think it is still an improvement on what's here 
and allows the users to tune their job according to their needs, but I don't 
think the performance issue that you cite can be fixed with what Rabbit 
provides.

If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ 
AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and 
checkpointing interval. This could also be an opt-in change if there are cases 
that need it disabled, and we should definitely update the docs.

Our company still actively uses Rabbit -- I'm happy to build off your PR and 
test it out in our jobs.

 

 

 

[1]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-03 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227
 ] 

Austin Cawley-Edwards edited comment on FLINK-10195 at 6/3/20, 5:07 PM:


Hey all, just bringing this back up as I'm trying to figure out if we could 
replace the consumer another way, but not looking like it. The only way I've 
found client-side flow control to work with RabbitMQ is through setting the 
prefetch count on the channel. I think that in combination with the work done 
by Luka, we might a decent solution where we could adjust the [channel's 
prefetch count|#basicQos(int)] based on the length of the blocking queue, which 
can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to 
implement. I've put together a small prototype/ playground in Node here 
([https://github.com/austince/backpressured-consumer-prototype]), along with 
some potential issues. The simplest implementation would be a static prefetch 
count, but if we want to tune that and have it update depending on the space 
left in the buffer I think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would 
handle the following tickets as well:

{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to 
oom
 {color:#cc7832}- FLINK-17559 {color}Duplicate of 
{color:#808080}FLINK-6885{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 


was (Author: austince):
Hey all, just bringing this back up as I'm trying to figure out if we could 
replace the consumer another way, but not looking like it. The only way I've 
found client-side flow control to work with RabbitMQ is through setting the 
prefetch count on the channel. I think that in combination with the work done 
by Luka, we might a decent solution where we could adjust the [channel's 
prefetch count|#basicQos(int)]] based on the length of the blocking queue, 
which can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to 
implement. I've put together a small prototype/ playground in Node here 
([https://github.com/austince/backpressured-consumer-prototype]), along with 
some potential issues. The simplest implementation would be a static prefetch 
count, but if we want to tune that and have it update depending on the space 
left in the buffer I think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would 
handle the following tickets as well:

{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to 
oom
 {color:#cc7832}- FLINK-17559 {color}Duplicate of 
{color:#808080}FLINK-6885{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-01 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227
 ] 

Austin Cawley-Edwards edited comment on FLINK-10195 at 6/1/20, 6:50 PM:


Hey all, just bringing this back up as I'm trying to figure out if we could 
replace the consumer another way, but not looking like it. The only way I've 
found client-side flow control to work with RabbitMQ is through setting the 
prefetch count on the channel. I think that in combination with the work done 
by Luka, we might a decent solution where we could adjust the [channel's 
prefetch count|#basicQos(int)]] based on the length of the blocking queue, 
which can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to 
implement. I've put together a small prototype/ playground in Node here 
([https://github.com/austince/backpressured-consumer-prototype]), along with 
some potential issues. The simplest implementation would be a static prefetch 
count, but if we want to tune that and have it update depending on the space 
left in the buffer I think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would 
handle the following tickets as well:

{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to 
oom
 {color:#cc7832}- FLINK-17559 {color}Duplicate of 
{color:#808080}FLINK-6885{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 


was (Author: austince):
Hey all, just bringing this back up as I'm trying to figure out if we could 
replace the consumer another way, but not looking like it. The only way I've 
found client-side flow control to work with RabbitMQ is through setting the 
prefetch count on the channel. I think that in combination with the work done 
by Luka, we might a decent solution where we could adjust the [channel's 
prefetch count|#basicQos(int)]] based on the length of the blocking queue, 
which can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to 
implement. I've put together a small prototype/ playground in Node here 
([https://github.com/austince/backpressured-consumer-prototype]), along with 
some potential issues. The simplest implementation would be a static prefetch 
count, but if we want to tune that and have it update depending on the space 
left in the buffer I think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would 
handle the following tickets as well:{color:#cc7832}- FLINK-6885 
{color}RMQSource does not support QoS, leading to oom
 {color:#cc7832}- FLINK-17559 {color}Duplicate of 
{color:#808080}`FLINK-6885`{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-01 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227
 ] 

Austin Cawley-Edwards edited comment on FLINK-10195 at 6/1/20, 6:49 PM:


Hey all, just bringing this back up as I'm trying to figure out if we could 
replace the consumer another way, but not looking like it. The only way I've 
found client-side flow control to work with RabbitMQ is through setting the 
prefetch count on the channel. I think that in combination with the work done 
by Luka, we might a decent solution where we could adjust the [channel's 
prefetch count|#basicQos(int)]] based on the length of the blocking queue, 
which can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to 
implement. I've put together a small prototype/ playground in Node here 
([https://github.com/austince/backpressured-consumer-prototype]), along with 
some potential issues. The simplest implementation would be a static prefetch 
count, but if we want to tune that and have it update depending on the space 
left in the buffer I think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would 
handle the following tickets as well:{color:#cc7832}- FLINK-6885 
{color}RMQSource does not support QoS, leading to oom
 {color:#cc7832}- FLINK-17559 {color}Duplicate of 
{color:#808080}`FLINK-6885`{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 


was (Author: austince):
Hey all, just bringing this back up as I'm trying to figure out if we could 
replace the consumer another way, but not looking like it. The only way I've 
found client-side flow control to work with RabbitMQ is through setting the 
prefetch count on the channel. I think that in combination with the work done 
by Luka, we might a decent solution where we could adjust the [channel's 
prefetch 
count|[https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicQos(int)]]
 based on the length of the blocking queue, which can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to 
implement. I've put together a small prototype/ playground in Node here 
([github.com/austince/backpressured-consumer-prototype|[https://github.com/austince/backpressured-consumer-prototype]]),
 along with some potential issues. The simplest implementation would be a 
static prefetch count, but if we want to tune that and have it update depending 
on the space left in the buffer I think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would 
handle the following tickets as well:{color:#cc7832}
{color}{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, 
leading to oom
{color:#cc7832}- FLINK-17559 {color}Duplicate of 
{color:#808080}`FLINK-6885`{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2020-06-01 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227
 ] 

Austin Cawley-Edwards commented on FLINK-10195:
---

Hey all, just bringing this back up as I'm trying to figure out if we could 
replace the consumer another way, but not looking like it. The only way I've 
found client-side flow control to work with RabbitMQ is through setting the 
prefetch count on the channel. I think that in combination with the work done 
by Luka, we might a decent solution where we could adjust the [channel's 
prefetch 
count|[https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicQos(int)]]
 based on the length of the blocking queue, which can be user-configurable.

 

This performs better than turning the consumer off/ on and is very simple to 
implement. I've put together a small prototype/ playground in Node here 
([github.com/austince/backpressured-consumer-prototype|[https://github.com/austince/backpressured-consumer-prototype]]),
 along with some potential issues. The simplest implementation would be a 
static prefetch count, but if we want to tune that and have it update depending 
on the space left in the buffer I think that's possible too.

 

If we allow the buffer queue length to be user-configurable, I think it would 
handle the following tickets as well:{color:#cc7832}
{color}{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, 
leading to oom
{color:#cc7832}- FLINK-17559 {color}Duplicate of 
{color:#808080}`FLINK-6885`{color}

 

{color:#172b4d}What do you all think of this proposal?{color}

{color:#172b4d}[~aljoscha] {color}

{color:#172b4d}[~dwysakowicz] {color}

{color:#172b4d}[~senegalo] {color}

 

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Assignee: Luka Jurukovski
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-7267) Add support for lists of hosts to connect

2020-05-15 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17108575#comment-17108575
 ] 

Austin Cawley-Edwards edited comment on FLINK-7267 at 5/15/20, 7:05 PM:


I put in a PR for option A, as it's less change – if we find that many people 
are overriding the connection, maybe that would be a good time to add it to the 
connection config builder? Anyways, can you assign this ticket to me (oops)? 
Thanks!


was (Author: austince):
I put in a PR for option A, as it's less change -- if we find that many people 
are overriding the connection, maybe that would be a good time to add it to the 
connection config builder? Anyways, can you assign this ticket to me? Thanks!

> Add support for lists of hosts to connect
> -
>
> Key: FLINK-7267
> URL: https://issues.apache.org/jira/browse/FLINK-7267
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.3.0
>Reporter: Hu Hailin
>Priority: Minor
>  Labels: pull-request-available
>
> The RMQConnectionConfig can assign one host:port only. I want to connect to a 
> cluster with an available node.
> My workaround is write my own sink extending RMQSink and override open(), 
> assigning the nodes list in it.
> {code:java}
>   connection = factory.newConnection(addrs)
> {code}
> I still need to build the RMQConnectionConfig with a dummy host:port or a 
> node in list. It's annoying.
> I think it is better to provide a configuration for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-7267) Add support for lists of hosts to connect

2020-05-15 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17108575#comment-17108575
 ] 

Austin Cawley-Edwards commented on FLINK-7267:
--

I put in a PR for option A, as it's less change -- if we find that many people 
are overriding the connection, maybe that would be a good time to add it to the 
connection config builder? Anyways, can you assign this ticket to me? Thanks!

> Add support for lists of hosts to connect
> -
>
> Key: FLINK-7267
> URL: https://issues.apache.org/jira/browse/FLINK-7267
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.3.0
>Reporter: Hu Hailin
>Priority: Minor
>  Labels: pull-request-available
>
> The RMQConnectionConfig can assign one host:port only. I want to connect to a 
> cluster with an available node.
> My workaround is write my own sink extending RMQSink and override open(), 
> assigning the nodes list in it.
> {code:java}
>   connection = factory.newConnection(addrs)
> {code}
> I still need to build the RMQConnectionConfig with a dummy host:port or a 
> node in list. It's annoying.
> I think it is better to provide a configuration for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-7267) Add support for lists of hosts to connect

2020-05-12 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104943#comment-17104943
 ] 

Austin Cawley-Edwards edited comment on FLINK-7267 at 5/12/20, 1:25 PM:


This seems like a reasonable feature to me. The only way to currently achieve 
this would be to override the open method for the source and completely 
re-implement it, which could be messy.

 

I think we could either:

A) create an `RMQSource#setupConnection` method, similar to 
`setupConnectionFactory`. The connection is only assigned in the `open` method, 
so I don't think it would be easy to introduce accidental errors in many 
places. This would allow users to make other changes to the connection by just 
overriding that method.

 

B) wrap the RMQ package's Address class and allow a list of them to be passed 
as an option to the `RMQConnectionConfig`, which could be used in the 
`RMQSource#setupConnectionFactory`.

 

 

I think option A would be best, as it is a minimal change and would open up the 
possibility of extending the connection in user code (ex: [adding address 
resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]).
 [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile?

 

Side-note, should this conversation happen on the dev mailing list?


was (Author: austince):
This seems like a reasonable feature to me. The only way to currently achieve 
this would be to override the open method for the source and completely 
re-implement it, which can be messy.

 

I think we could either:

A) create an `RMQSource#setupConnection` method, similar to 
`setupConnectionFactory`. The connection only assigned in the `open` method, so 
I don't think it would be easy to introduce accidental errors in many places. 
This would allow users to make other changes to the connection by just 
overriding that method.

 

B) wrap the RMQ package's Address class and allow a list of them to be passed 
as an option to the `RMQConnectionConfig`, which could be used in the 
`RMQSource#setupConnectionFactory`.

 

 

I think option A would be best, as it is a minimal change and would open up the 
possibility of extending the connection in user code (ex: [adding address 
resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]).
 [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile?

 

Side-note, should this conversation happen on the dev mailing list?

> Add support for lists of hosts to connect
> -
>
> Key: FLINK-7267
> URL: https://issues.apache.org/jira/browse/FLINK-7267
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.3.0
>Reporter: Hu Hailin
>Priority: Minor
>
> The RMQConnectionConfig can assign one host:port only. I want to connect to a 
> cluster with an available node.
> My workaround is write my own sink extending RMQSink and override open(), 
> assigning the nodes list in it.
> {code:java}
>   connection = factory.newConnection(addrs)
> {code}
> I still need to build the RMQConnectionConfig with a dummy host:port or a 
> node in list. It's annoying.
> I think it is better to provide a configuration for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-7267) Add support for lists of hosts to connect

2020-05-11 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104943#comment-17104943
 ] 

Austin Cawley-Edwards edited comment on FLINK-7267 at 5/11/20, 11:46 PM:
-

This seems like a reasonable feature to me. The only way to currently achieve 
this would be to override the open method for the source and completely 
re-implement it, which can be messy.

 

I think we could either:

A) create an `RMQSource#setupConnection` method, similar to 
`setupConnectionFactory`. The connection only assigned in the `open` method, so 
I don't think it would be easy to introduce accidental errors in many places. 
This would allow users to make other changes to the connection by just 
overriding that method.

 

B) wrap the RMQ package's Address class and allow a list of them to be passed 
as an option to the `RMQConnectionConfig`, which could be used in the 
`RMQSource#setupConnectionFactory`.

 

 

I think option A would be best, as it is a minimal change and would open up the 
possibility of extending the connection in user code (ex: [adding address 
resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]).
 [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile?

 

Side-note, should this conversation happen on the dev mailing list?


was (Author: austince):
This seems like a reasonable feature to me. The only way to currently achieve 
this would be to override the open method for the source and completely 
re-implement it, which can be messy.

 

I think we could either:

A) create an `RMQSource#setupConnection` method, similar to 
`setupConnectionFactory`. The connection only assigned in the `open` method, so 
I don't think it would be easy to introduce accidental errors in many places. 
This would allow users to make other changes to the connection by just 
overriding that method.

 

B) wrap the RMQ package's Address class and add a list of them to be passed as 
an option to the `RMQConnectionConfig`, which could be used in the 
`RMQSource#setupConnectionFactory`.

 

 

I think option A would be best, as it is a minimal change and would open up the 
possibility of extending the connection in user code (ex: [adding address 
resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]).
 [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile?

 

Side-note, should this conversation happen on the dev mailing list?

> Add support for lists of hosts to connect
> -
>
> Key: FLINK-7267
> URL: https://issues.apache.org/jira/browse/FLINK-7267
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.3.0
>Reporter: Hu Hailin
>Priority: Minor
>
> The RMQConnectionConfig can assign one host:port only. I want to connect to a 
> cluster with an available node.
> My workaround is write my own sink extending RMQSink and override open(), 
> assigning the nodes list in it.
> {code:java}
>   connection = factory.newConnection(addrs)
> {code}
> I still need to build the RMQConnectionConfig with a dummy host:port or a 
> node in list. It's annoying.
> I think it is better to provide a configuration for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-7267) Add support for lists of hosts to connect

2020-05-11 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-7267:
-
Comment: was deleted

(was: It might also be worth adding a common base class for both the source and 
the sink so these common types of overrides can happen for both.)

> Add support for lists of hosts to connect
> -
>
> Key: FLINK-7267
> URL: https://issues.apache.org/jira/browse/FLINK-7267
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.3.0
>Reporter: Hu Hailin
>Priority: Minor
>
> The RMQConnectionConfig can assign one host:port only. I want to connect to a 
> cluster with an available node.
> My workaround is write my own sink extending RMQSink and override open(), 
> assigning the nodes list in it.
> {code:java}
>   connection = factory.newConnection(addrs)
> {code}
> I still need to build the RMQConnectionConfig with a dummy host:port or a 
> node in list. It's annoying.
> I think it is better to provide a configuration for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-7267) Add support for lists of hosts to connect

2020-05-11 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104972#comment-17104972
 ] 

Austin Cawley-Edwards commented on FLINK-7267:
--

It might also be worth adding a common base class for both the source and the 
sink so these common types of overrides can happen for both.

> Add support for lists of hosts to connect
> -
>
> Key: FLINK-7267
> URL: https://issues.apache.org/jira/browse/FLINK-7267
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.3.0
>Reporter: Hu Hailin
>Priority: Minor
>
> The RMQConnectionConfig can assign one host:port only. I want to connect to a 
> cluster with an available node.
> My workaround is write my own sink extending RMQSink and override open(), 
> assigning the nodes list in it.
> {code:java}
>   connection = factory.newConnection(addrs)
> {code}
> I still need to build the RMQConnectionConfig with a dummy host:port or a 
> node in list. It's annoying.
> I think it is better to provide a configuration for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-05-11 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104955#comment-17104955
 ] 

Austin Cawley-Edwards edited comment on FLINK-17529 at 5/11/20, 11:01 PM:
--

After more investigation, I don't think we're able to replace this without 
tackling the issues brought up in FLINK-10195. I think there are other ways of 
flow-control than described in that ticket though, which might be worth 
pursuing. Should I close this ticket and move the conversation back to 
FLINK-10195? Sorry for creating it before diving in!


was (Author: austince):
After more investigation, I don't think we're able to replace this without 
tackling the issues brought up in FLINK-10195. I think there are other ways of 
flow-control than described in that ticket though, which might be worth 
pursuing. Should I close this ticket and move the conversation back to 
FLINK-10195?

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-05-11 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104955#comment-17104955
 ] 

Austin Cawley-Edwards commented on FLINK-17529:
---

After more investigation, I don't think we're able to replace this without 
tackling the issues brought up in FLINK-10195. I think there are other ways of 
flow-control than described in that ticket though, which might be worth 
pursuing. Should I close this ticket and move the conversation back to 
FLINK-10195?

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-7267) Add support for lists of hosts to connect

2020-05-11 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104943#comment-17104943
 ] 

Austin Cawley-Edwards edited comment on FLINK-7267 at 5/11/20, 10:33 PM:
-

This seems like a reasonable feature to me. The only way to currently achieve 
this would be to override the open method for the source and completely 
re-implement it, which can be messy.

 

I think we could either:

A) create an `RMQSource#setupConnection` method, similar to 
`setupConnectionFactory`. The connection only assigned in the `open` method, so 
I don't think it would be easy to introduce accidental errors in many places. 
This would allow users to make other changes to the connection by just 
overriding that method.

 

B) wrap the RMQ package's Address class and add a list of them to be passed as 
an option to the `RMQConnectionConfig`, which could be used in the 
`RMQSource#setupConnectionFactory`.

 

 

I think option A would be best, as it is a minimal change and would open up the 
possibility of extending the connection in user code (ex: [adding address 
resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]).
 [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile?

 

Side-note, should this conversation happen on the dev mailing list?


was (Author: austince):
This seems like a reasonable feature to me. The only way to currently achieve 
this would be to override the open method for the source and completely 
re-implement it, which can be messy.

 

I think we could either:

A) create an `RMQSource#setupConnection` method, similar to 
`setupConnectionFactory`. The connection only assigned in the `open` method, so 
I don't think it would be easy to introduce accidental errors in many places. 
This would allow users to make other changes to the connection by just 
overriding that method.

 

B) wrap the RMQ package's Address class and add a list of them to be passed as 
an option to the `RMQConnectionConfig`, which could be used in the 
`RMQSource#setupConnectionFactory`.

 

 

I think option A would be best, as it is a minimal change and would open up the 
possibility of extending the connection in user code. [~aljoscha] [~senegalo] - 
what do you think? Is this change worthwhile?

 

Side-note, should this conversation happen on the dev mailing list?

> Add support for lists of hosts to connect
> -
>
> Key: FLINK-7267
> URL: https://issues.apache.org/jira/browse/FLINK-7267
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.3.0
>Reporter: Hu Hailin
>Priority: Minor
>
> The RMQConnectionConfig can assign one host:port only. I want to connect to a 
> cluster with an available node.
> My workaround is write my own sink extending RMQSink and override open(), 
> assigning the nodes list in it.
> {code:java}
>   connection = factory.newConnection(addrs)
> {code}
> I still need to build the RMQConnectionConfig with a dummy host:port or a 
> node in list. It's annoying.
> I think it is better to provide a configuration for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-7267) Add support for lists of hosts to connect

2020-05-11 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104943#comment-17104943
 ] 

Austin Cawley-Edwards commented on FLINK-7267:
--

This seems like a reasonable feature to me. The only way to currently achieve 
this would be to override the open method for the source and completely 
re-implement it, which can be messy.

 

I think we could either:

A) create an `RMQSource#setupConnection` method, similar to 
`setupConnectionFactory`. The connection only assigned in the `open` method, so 
I don't think it would be easy to introduce accidental errors in many places. 
This would allow users to make other changes to the connection by just 
overriding that method.

 

B) wrap the RMQ package's Address class and add a list of them to be passed as 
an option to the `RMQConnectionConfig`, which could be used in the 
`RMQSource#setupConnectionFactory`.

 

 

I think option A would be best, as it is a minimal change and would open up the 
possibility of extending the connection in user code. [~aljoscha] [~senegalo] - 
what do you think? Is this change worthwhile?

 

Side-note, should this conversation happen on the dev mailing list?

> Add support for lists of hosts to connect
> -
>
> Key: FLINK-7267
> URL: https://issues.apache.org/jira/browse/FLINK-7267
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.3.0
>Reporter: Hu Hailin
>Priority: Minor
>
> The RMQConnectionConfig can assign one host:port only. I want to connect to a 
> cluster with an available node.
> My workaround is write my own sink extending RMQSink and override open(), 
> assigning the nodes list in it.
> {code:java}
>   connection = factory.newConnection(addrs)
> {code}
> I still need to build the RMQConnectionConfig with a dummy host:port or a 
> node in list. It's annoying.
> I think it is better to provide a configuration for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-05-11 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104529#comment-17104529
 ] 

Austin Cawley-Edwards commented on FLINK-17529:
---

That would be great, thanks [~aljoscha]. I'm still not sure if this is totally 
possible without other changes but will investigate.

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-05-06 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100286#comment-17100286
 ] 

Austin Cawley-Edwards edited comment on FLINK-17529 at 5/6/20, 10:23 PM:
-

Coming from https://issues.apache.org/jira/browse/FLINK-10195 (and the 
associated PR), just replacing the consumer might be an easier first step than 
as part of that fix.


was (Author: austince):
Coming from https://issues.apache.org/jira/browse/FLINK-10195 (and the 
associated PR), just replacing the consumer might be an easier first step than 
to do it with that fix.

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17204) The RMQSource and RMQSink of the RabbitMQ connector have inconsistent default value of durable ​​when declaring the queue.

2020-05-06 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101232#comment-17101232
 ] 

Austin Cawley-Edwards commented on FLINK-17204:
---

Thank you!!

> The RMQSource and RMQSink of the RabbitMQ connector have inconsistent default 
> value of durable ​​when declaring the queue.
> --
>
> Key: FLINK-17204
> URL: https://issues.apache.org/jira/browse/FLINK-17204
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: ChaojianZhang
>Assignee: Austin Cawley-Edwards
>Priority: Major
>  Labels: pull-request-available
>
> When the RabbitMQ queue is created and the value of durable is set to true. 
> When I use the data in the RabbitMQ queue as the source and sink it into 
> another queue of RabbitMQ after Flink processing, the program reports an 
> exception, the main exception information is as follows:
>  
> {code:java}
> ...
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; 
> protocol method: #method(reply-code=406, 
> reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 
> 'rabbitmq_connectors_sink' in vhost '/': received 'false' but current is 
> 'true', class-id=50, method-id=10)Caused by: 
> com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: 
> #method(reply-code=406, reply-text=PRECONDITION_FAILED - 
> inequivalent arg 'durable' for queue 'rabbitmq_connectors_sink' in vhost '/': 
> received 'false' but current is 'true', class-id=50, method-id=10) at 
> com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at 
> com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
>  at 
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
>  at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263) at 
> com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136) ... 
> 15 more
> ...{code}
>  
> I checked the source code of RMQSource and RMQSink and found that the 
> setupQueue() method of these two classes set the durable value in 
> queueDeclare() is inconsistent, I think they should be consistent to be 
> reasonable.
> If possible, I want to fix it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-05-05 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-17529:
--
Description: The RMQ QueueingConsumer is used in the RMQSource to get a 
simple blocking consumer. This has been deprecated in 
`com.rabbitmq:amqp-client` 4.2.0 and is removed in 5.x. It should be replaced 
by a `com.rabbitmq.client.DefaultConsumer`.  (was: The RMQ QueueingConsumer is 
used in the RMQSource to get a simple blocking consumer. This has been 
deprecated in `com.rabbitmq:amqp-client` 4.2.0 and will be removed in 5.x. It 
should be replaced by a `com.rabbitmq.client.DefaultConsumer`.)

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is 
> removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-05-05 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100286#comment-17100286
 ] 

Austin Cawley-Edwards commented on FLINK-17529:
---

Coming from https://issues.apache.org/jira/browse/FLINK-10195 (and the 
associated PR), just replacing the consumer might be an easier first step than 
to do it with that fix.

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and 
> will be removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-05-05 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-17529:
--
Priority: Minor  (was: Major)

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Minor
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and 
> will be removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-05-05 Thread Austin Cawley-Edwards (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Austin Cawley-Edwards updated FLINK-17529:
--
Issue Type: Improvement  (was: Bug)

> Replace Deprecated RMQ QueueingConsumer
> ---
>
> Key: FLINK-17529
> URL: https://issues.apache.org/jira/browse/FLINK-17529
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: Austin Cawley-Edwards
>Priority: Major
>
> The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
> consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and 
> will be removed in 5.x. It should be replaced by a 
> `com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17204) The RMQSource and RMQSink of the RabbitMQ connector have inconsistent default value of durable ​​when declaring the queue.

2020-05-05 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100184#comment-17100184
 ] 

Austin Cawley-Edwards commented on FLINK-17204:
---

Thanks for assigning [~aljoscha], PR is open. Should I go back to the mailing 
list thread to look for reviewers or are you able to take it whenever you have 
time?

> The RMQSource and RMQSink of the RabbitMQ connector have inconsistent default 
> value of durable ​​when declaring the queue.
> --
>
> Key: FLINK-17204
> URL: https://issues.apache.org/jira/browse/FLINK-17204
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Affects Versions: 1.10.0
>Reporter: ChaojianZhang
>Assignee: Austin Cawley-Edwards
>Priority: Major
>  Labels: pull-request-available
>
> When the RabbitMQ queue is created and the value of durable is set to true. 
> When I use the data in the RabbitMQ queue as the source and sink it into 
> another queue of RabbitMQ after Flink processing, the program reports an 
> exception, the main exception information is as follows:
>  
> {code:java}
> ...
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; 
> protocol method: #method(reply-code=406, 
> reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 
> 'rabbitmq_connectors_sink' in vhost '/': received 'false' but current is 
> 'true', class-id=50, method-id=10)Caused by: 
> com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: 
> #method(reply-code=406, reply-text=PRECONDITION_FAILED - 
> inequivalent arg 'durable' for queue 'rabbitmq_connectors_sink' in vhost '/': 
> received 'false' but current is 'true', class-id=50, method-id=10) at 
> com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at 
> com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
>  at 
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
>  at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263) at 
> com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136) ... 
> 15 more
> ...{code}
>  
> I checked the source code of RMQSource and RMQSink and found that the 
> setupQueue() method of these two classes set the durable value in 
> queueDeclare() is inconsistent, I think they should be consistent to be 
> reasonable.
> If possible, I want to fix it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer

2020-05-05 Thread Austin Cawley-Edwards (Jira)
Austin Cawley-Edwards created FLINK-17529:
-

 Summary: Replace Deprecated RMQ QueueingConsumer
 Key: FLINK-17529
 URL: https://issues.apache.org/jira/browse/FLINK-17529
 Project: Flink
  Issue Type: Bug
  Components: Connectors/ RabbitMQ
Affects Versions: 1.10.0
Reporter: Austin Cawley-Edwards


The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking 
consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and will 
be removed in 5.x. It should be replaced by a 
`com.rabbitmq.client.DefaultConsumer`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)