[jira] [Updated] (FLINK-32626) Distinguish non-existent job from non-existent savepoint in Get Savepoint REST API
[ https://issues.apache.org/jira/browse/FLINK-32626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-32626: -- Description: The current `GET /jobs/:jobid/savepoints/:triggerid` API endpoint [1], when given *either* a Job ID or a Trigger ID that does not exist it will always respond with an exception that indicates {*}the Savepoint doesn't exist{*}, like: {code:java} {"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: There is no savepoint operation with triggerId=TRIGGER ID for job JOB ID.\n\tat org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.maybeCreateNotFoundError(SavepointHandlers.java:325)\n\tat org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$handleRequest$1(SavepointHandlers.java:308)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)\n\tat akka.dispatch.OnComplete.internal(Future.scala:299)\n\tat akka.dispatch.OnComplete.internal(Future.scala:297)\n\tat akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)\n\tat akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)\n\tat scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)\n\tat scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)\n\tat scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)\n\tat scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)\n\tat scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)\n\tat akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)\n\tat akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)\n\tat akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)\n\tat scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)\n\tat scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\n\tat scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\n\tat scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)\n\tat akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)\n\tat akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)\n\tat akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)\n\tat akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)\n\tat java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)\n\tat java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)\n\tat
[jira] [Updated] (FLINK-32626) Distinguish non-existent job from non-existent savepoint in Get Savepoint REST API
[ https://issues.apache.org/jira/browse/FLINK-32626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-32626: -- Summary: Distinguish non-existent job from non-existent savepoint in Get Savepoint REST API (was: Get Savepoint REST API doesn't distinguish non-existent job from non-existent savepoint ) > Distinguish non-existent job from non-existent savepoint in Get Savepoint > REST API > -- > > Key: FLINK-32626 > URL: https://issues.apache.org/jira/browse/FLINK-32626 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Affects Versions: 1.17.1 >Reporter: Austin Cawley-Edwards >Priority: Not a Priority > > The current `GET /jobs/:jobid/savepoints/:triggerid` API endpoint [1], when > given *either* a Job ID or a Trigger ID that does not exist, it will respond > with an exception that indicates the Savepoint doesn't exist, like: > {code:java} > {"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: There > is no savepoint operation with triggerId=TRIGGER ID for job JOB ID.\n\tat > org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.maybeCreateNotFoundError(SavepointHandlers.java:325)\n\tat > > org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$handleRequest$1(SavepointHandlers.java:308)\n\tat > > java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat > > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat > > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat > > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat > > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)\n\tat > > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat > > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat > > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat > > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat > > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)\n\tat > > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)\n\tat > > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat > > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)\n\tat > > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat > > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat > > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat > > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat > > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)\n\tat > akka.dispatch.OnComplete.internal(Future.scala:299)\n\tat > akka.dispatch.OnComplete.internal(Future.scala:297)\n\tat > akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)\n\tat > akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)\n\tat > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)\n\tat > > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)\n\tat > > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)\n\tat > > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)\n\tat > > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)\n\tat > akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)\n\tat > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)\n\tat > > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)\n\tat > scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)\n\tat > scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\n\tat > scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\n\tat >
[jira] [Updated] (FLINK-32626) Distinguish non-existent job from non-existent savepoint in Get Savepoint REST API
[ https://issues.apache.org/jira/browse/FLINK-32626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-32626: -- Description: The current `GET /jobs/:jobid/savepoints/:triggerid` API endpoint [1], when given *either* a Job ID or a Trigger ID that does not exist, it will respond with an exception that indicates the Savepoint doesn't exist, like: {code:java} {"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: There is no savepoint operation with triggerId=TRIGGER ID for job JOB ID.\n\tat org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.maybeCreateNotFoundError(SavepointHandlers.java:325)\n\tat org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$handleRequest$1(SavepointHandlers.java:308)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)\n\tat akka.dispatch.OnComplete.internal(Future.scala:299)\n\tat akka.dispatch.OnComplete.internal(Future.scala:297)\n\tat akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)\n\tat akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)\n\tat scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)\n\tat scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)\n\tat scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)\n\tat scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)\n\tat scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)\n\tat akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)\n\tat akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)\n\tat akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)\n\tat scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)\n\tat scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\n\tat scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\n\tat scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)\n\tat akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)\n\tat akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)\n\tat akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)\n\tat akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)\n\tat java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)\n\tat java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)\n\tat
[jira] [Created] (FLINK-32626) Get Savepoint REST API doesn't distinguish non-existent job from non-existent savepoint
Austin Cawley-Edwards created FLINK-32626: - Summary: Get Savepoint REST API doesn't distinguish non-existent job from non-existent savepoint Key: FLINK-32626 URL: https://issues.apache.org/jira/browse/FLINK-32626 Project: Flink Issue Type: Improvement Components: Runtime / REST Affects Versions: 1.17.1 Reporter: Austin Cawley-Edwards The current `GET /jobs/:jobid/savepoints/:triggerid` API endpoint [1], when given *either* a Job ID or a Trigger ID that does not exist, it will respond with an exception that indicates the Savepoint doesn't exist, like: {code:java} {"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: There is no savepoint operation with triggerId=TRIGGER ID for job JOB ID.\n\tat org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.maybeCreateNotFoundError(SavepointHandlers.java:325)\n\tat org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$handleRequest$1(SavepointHandlers.java:308)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)\n\tat org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)\n\tat akka.dispatch.OnComplete.internal(Future.scala:299)\n\tat akka.dispatch.OnComplete.internal(Future.scala:297)\n\tat akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)\n\tat akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)\n\tat scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)\n\tat scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)\n\tat scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)\n\tat scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)\n\tat scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)\n\tat akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)\n\tat akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)\n\tat akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)\n\tat scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)\n\tat scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\n\tat scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\n\tat scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)\n\tat akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)\n\tat akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)\n\tat akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)\n\tat
[jira] [Updated] (FLINK-10212) REST API for listing all the available save points
[ https://issues.apache.org/jira/browse/FLINK-10212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-10212: -- Description: *Background* I'm one of the authors of the open-source Flink job deployer (https://github.com/ing-bank/flink-deployer). Recently, I rewrote our implementation to use the Flink REST API instead of the native CLI. In our use case, we store the job savepoints in a Kubernetes persistent volume. For our deployer, we mount the persistent volume to our deployer container so that we can find and use the savepoints. In the rewrite to the REST API, I saw that the API to monitor savepoint creation returns the complete path to the created savepoint, and we can use this one in the job deployer to start the new job with the latest save point. However, we also allow users to deploy a job with a recovered state by specifying only the directory savepoints are stored in. In this scenario we will look for the latest savepoint created for this job ourselves inside the given directory. To find this path, we're still relying on the mounted volume and listing directory content to discover savepoints. *Feature* I was thinking that it might be a good addition if the native Flink REST API offers the ability to retrieve savepoints. Seeing that the API doesn't inherently know where savepoints are stored, it could take a path as one of the arguments. It could even allow the user to provide a job ID as an argument so that the API would be able to search for savepoints for a specific job ID in the specified directory. As the API would require the path as an argument, and providing a path containing forward slashes in the URL isn't ideal, I'm eager to discuss what a proper solution would look like. A POST request to /jobs/:jobid/savepoints with the path as a body parameter would make sense if the API were to offer to list all save points in a specific path but this request is already being used for creating new savepoints. An alternative could be a POST to /savepoints with the path and job ID in the request body. A POST request to retrieve data is obviously not the most straightforward approach but in my opinion still preferable over a GET to, for example, /jobs/:jobid/savepoints/:targetDirectory I'm willing to help out on this one by submitting a pull request. Looking forward to your thoughts! was: *Background* I'm one of the authors of the open-source Flink job deployer ([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our implementation to use the Flink REST API instead of the native CLI. In our use case, we store the job savepoints in a Kubernetes persistent volume. For our deployer, we mount the persistent volume to our deployer container so that we can find and use the savepoints. In the rewrite to the REST API, I saw that the API to monitor savepoint creation returns the complete path to the created savepoint, and we can use this one in the job deployer to start the new job with the latest save point. However, we also allow users to deploy a job with a recovered state by specifying only the directory savepoints are stored in. In this scenario we will look for the latest savepoint created for this job ourselves inside the given directory. To find this path, we're still relying on the mounted volume and listing directory content to discover savepoints. *Feature* I was thinking that it might be a good addition if the native Flink REST API offers the ability to retrieve savepoints. Seeing that the API doesn't inherently know where savepoints are stored, it could take a path as one of the arguments. It could even allow the user to provide a job ID as an argument so that the API would be able to search for savepoints for a specific job ID in the specified directory. As the API would require the path as an argument, and providing a path containing forward slashes in the URL isn't ideal, I'm eager to discuss what a proper solution would look like. A POST request to /jobs/:jobid/savepoints with the path as a body parameter would make sense if the API were to offer to list all save points in a specific path but this request is already being used for creating new savepoints. An alternative could be a POST to /savepoints with the path and job ID in the request body. A POST request to retrieve data is obviously not the most straightforward approach but in my opinion still preferable over a GET to, for example, /jobs/:jobid/savepoints/:targetDirectory I'm willing to help out on this one by submitting a pull request. Looking forward to your thoughts! > REST API for listing all the available save points > -- > > Key: FLINK-10212 > URL: https://issues.apache.org/jira/browse/FLINK-10212 > Project: Flink > Issue Type: New
[jira] [Updated] (FLINK-30445) Add Flink Web3 Connector
[ https://issues.apache.org/jira/browse/FLINK-30445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-30445: -- Description: Web3 is very hot. But you could search GitHub open source blockchain explorer, the most stars project is blockscout, [https://github.com/blockscout/blockscout|https://github.com/blockscout/blockscout,] which use Elixir as a parallel engine to sync block from blockchain node into a file(CSV format). I think Flink is the best solution of ingestion. Reason: (1)blockchain needs to match different chain, including Ethereum, Bitcoin, Solana, etc. through JSON RPC. (2)Like EtherScan, the blockchain needs to fetch the latest block into storage for the index to search. (3)Also as a supplement to (2), we need a connector to fully sync all block from Blockchain Node. I think Flink Stream/Batch alignment feature is suit for this scenarios. (4)According to FLIP-27, we could use block number as SourceSplit to read. It is very natural. (5)Flink Community could use web3 topic to get PR effects on web3 cycle. was: Web3 is very hot. But you could search GitHub open source blockchain explorer, the most stars project is blockscout, [https://github.com/blockscout/blockscout,] which use Elixir as a parallel engine to sync block from blockchain node into a file(CSV format). I think Flink is the best solution of ingestion. Reason: (1)blockchain needs to match different chain, including Ethereum, Bitcoin, Solana, etc. through JSON RPC. (2)Like EtherScan, the blockchain needs to fetch the latest block into storage for the index to search. (3)Also as a supplement to (2), we need a connector to fully sync all block from Blockchain Node. I think Flink Stream/Batch alignment feature is suit for this scenarios. (4)According to FLIP-27, we could use block number as SourceSplit to read. It is very natural. (5)Flink Community could use web3 topic to get PR effects on web3 cycle. > Add Flink Web3 Connector > > > Key: FLINK-30445 > URL: https://issues.apache.org/jira/browse/FLINK-30445 > Project: Flink > Issue Type: New Feature > Components: Connectors / ORC >Reporter: Junyao Huang >Priority: Major > > Web3 is very hot. But you could search GitHub open source blockchain > explorer, the most stars project is blockscout, > [https://github.com/blockscout/blockscout|https://github.com/blockscout/blockscout,] > which use Elixir as a parallel engine to sync block from blockchain node > into a file(CSV format). I think Flink is the best solution of ingestion. > Reason: > (1)blockchain needs to match different chain, including Ethereum, Bitcoin, > Solana, etc. through JSON RPC. > (2)Like EtherScan, the blockchain needs to fetch the latest block into > storage for the index to search. > (3)Also as a supplement to (2), we need a connector to fully sync all block > from Blockchain Node. I think Flink Stream/Batch alignment feature is suit > for this scenarios. > (4)According to FLIP-27, we could use block number as SourceSplit to read. It > is very natural. > (5)Flink Community could use web3 topic to get PR effects on web3 cycle. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29863) Properly handle NaN/Infinity in OpenAPI spec
[ https://issues.apache.org/jira/browse/FLINK-29863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646786#comment-17646786 ] Austin Cawley-Edwards commented on FLINK-29863: --- Because we're using Jackson to serialize and they support these number formats? If that's the case ( :( ), they seem to have been talking about this for years with no real progress: [https://github.com/FasterXML/jackson-databind/issues/911] There's a couple of ideas in that issue for ways to correctly serialize NaN and infinities as `null`, but I can see that it would be hard to write test cases to enforce this throughout Flink's Rest API. One thought might be to do not try to strictly enforce it, but treat all found occurrences as bugs — it would be nice if the JSON could be valid, as working with multi-type returns even if documented in OpenAPI is a bit painful. Wdyt? > Properly handle NaN/Infinity in OpenAPI spec > > > Key: FLINK-29863 > URL: https://issues.apache.org/jira/browse/FLINK-29863 > Project: Flink > Issue Type: Bug > Components: Documentation, Runtime / REST >Affects Versions: 1.15.0 >Reporter: Chesnay Schepler >Priority: Major > Fix For: 1.17.0 > > > Our OpenAPI spec maps all float/double fields to float64, but we at times > also return NaN/infinity which can't be represented as such since the JSON > spec doesn't support it. > One alternative could be to document it as an either type, returning either a > float64 or a string. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29863) Properly handle NaN/Infinity in OpenAPI spec
[ https://issues.apache.org/jira/browse/FLINK-29863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646692#comment-17646692 ] Austin Cawley-Edwards commented on FLINK-29863: --- What might be the issue of fixing the API to not return NaN/infinity instead? > Numeric values that cannot be represented as sequences of digits (such as > Infinity and NaN) are not permitted. https://www.ecma-international.org/publications-and-standards/standards/ecma-404/ > Properly handle NaN/Infinity in OpenAPI spec > > > Key: FLINK-29863 > URL: https://issues.apache.org/jira/browse/FLINK-29863 > Project: Flink > Issue Type: Bug > Components: Documentation, Runtime / REST >Affects Versions: 1.15.0 >Reporter: Chesnay Schepler >Priority: Major > Fix For: 1.17.0 > > > Our OpenAPI spec maps all float/double fields to float64, but we at times > also return NaN/infinity which can't be represented as such since the JSON > spec doesn't support it. > One alternative could be to document it as an either type, returning either a > float64 or a string. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26335) prometheus/metrics - java.lang.OutOfMemoryError: Java heap space
[ https://issues.apache.org/jira/browse/FLINK-26335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17610148#comment-17610148 ] Austin Cawley-Edwards commented on FLINK-26335: --- Hey [~mlnogueira], thanks for the report. Do you know roughly how many metrics your job was trying to export (# of labels, # of custom metrics, total size (hundreds of mbs? gbs?)) when this happened? Were you able to resolve it by giving the job more resources? > prometheus/metrics - java.lang.OutOfMemoryError: Java heap space > > > Key: FLINK-26335 > URL: https://issues.apache.org/jira/browse/FLINK-26335 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.14.3 >Reporter: Marcelo Nogueira >Priority: Major > > > {code:java} > 2022-02-22 14:39:33.782 INFO > [org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler] > - Committing the state for checkpoint 4 > 2022-02-22 14:39:47.367 ERROR > [org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler] - WARNING: > Thread 'prometheus-http-1-6' produced an uncaught exception. If you want to > fail on uncaught exceptions, then configure > cluster.uncaught-exception-handling accordingly > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Unknown Source) ~[?:?] > at java.io.ByteArrayOutputStream.grow(Unknown Source) ~[?:?] > at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source) ~[?:?] > at java.io.ByteArrayOutputStream.write(Unknown Source) ~[?:?] > at sun.nio.cs.StreamEncoder.writeBytes(Unknown Source) ~[?:?] > at sun.nio.cs.StreamEncoder.implWrite(Unknown Source) ~[?:?] > at sun.nio.cs.StreamEncoder.implWrite(Unknown Source) ~[?:?] > at sun.nio.cs.StreamEncoder.write(Unknown Source) ~[?:?] > at sun.nio.cs.StreamEncoder.write(Unknown Source) ~[?:?] > at java.io.OutputStreamWriter.write(Unknown Source) ~[?:?] > at java.io.Writer.append(Unknown Source) ~[?:?] > at > io.prometheus.client.exporter.common.TextFormat.writeEscapedLabelValue(TextFormat.java:88) > ~[flink-metrics-prometheus-1.14.3.jar:1.14.3] > at > io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:42) > ~[flink-metrics-prometheus-1.14.3.jar:1.14.3] > at > io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60) > ~[flink-metrics-prometheus-1.14.3.jar:1.14.3] > at com.sun.net.httpserver.Filter$Chain.doFilter(Unknown Source) > ~[jdk.httpserver:?] > at sun.net.httpserver.AuthFilter.doFilter(Unknown Source) > ~[jdk.httpserver:?] > at com.sun.net.httpserver.Filter$Chain.doFilter(Unknown Source) > ~[jdk.httpserver:?] > at sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(Unknown > Source) ~[jdk.httpserver:?] > at com.sun.net.httpserver.Filter$Chain.doFilter(Unknown Source) > ~[jdk.httpserver:?] > at sun.net.httpserver.ServerImpl$Exchange.run(Unknown Source) > ~[jdk.httpserver:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > at java.lang.Thread.run(Unknown Source) [?:?] > 2022-02-22 14:40:32.668 INFO > [org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer] - Flushing > new partitions// code placeholder > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27356) Incorrect Number of Methods Listed for SplitReader
Austin Cawley-Edwards created FLINK-27356: - Summary: Incorrect Number of Methods Listed for SplitReader Key: FLINK-27356 URL: https://issues.apache.org/jira/browse/FLINK-27356 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.14.0, 1.15.0 Reporter: Austin Cawley-Edwards The docs state that [`SplitReader`|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java] only has 3 methods, but it has four. The `close()` method is missing from the docs. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-24208) Allow idempotent savepoint triggering
[ https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418252#comment-17418252 ] Austin Cawley-Edwards commented on FLINK-24208: --- Great, we're on the same page. Thanks! > Allow idempotent savepoint triggering > - > > Key: FLINK-24208 > URL: https://issues.apache.org/jira/browse/FLINK-24208 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / REST >Reporter: Robert Metzger >Priority: Major > > As a user of Flink, I want to be able to trigger a savepoint from an external > system in a way that I can detect if I have requested this savepoint already. > By passing a custom ID to the savepoint request, I can check (in case of an > error of the original request, or the external system) if the request has > been made already. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-24208) Allow idempotent savepoint triggering
[ https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417718#comment-17417718 ] Austin Cawley-Edwards edited comment on FLINK-24208 at 9/20/21, 5:48 PM: - [~chesnay], definitely on the same page that the user provides a specific ID. Maybe I did not understand the sketch, but from the current description I would assume sending the same `triggerId` if one already exists would result in an error (like 409 Conflict), so it can be detected as a duplicate on the user side. I think what we rather want to say (and what I thought the sketch implements) is that duplicate requests result in the same success response (i.e., 200 w/ a `TriggerResponse`). It does not matter to the user that another request with the same `triggerId` has already been sent. I would suggest updating the description to: As a user of Flink, I want to be able to trigger a savepoint from an external system in a way that is safe to retry so I can guard against ephemeral errors, like network failures, and simplify my upgrade routines. By passing a custom ID to the savepoint request, I can let Flink deduplicate the requests and not worry about storing the result or about sending the request again. was (Author: austince): [~chesnay], definitely on the same page that the user provides a specific ID. Maybe I did not understand the sketch, but from the current description I would assume sending the same `triggerId` if one is already exists would result in an error (like 409 Conflict), so it can be detected as a duplicate on the user side. I think what we rather want to say (and what I thought the sketch implements) is that duplicate requests result in the same success response (i.e., 200 w/ a `TriggerResponse`). It does not matter to the user that another request with the same `triggerId` has already been sent. I would suggest updating the description to: As a user of Flink, I want to be able to trigger a savepoint from an external system in a way that is safe to retry so I can guard against ephemeral errors, like network failures, and simplify my upgrade routines. By passing a custom ID to the savepoint request, I can let Flink deduplicate the requests and not worry about storing the result or sending the request again. > Allow idempotent savepoint triggering > - > > Key: FLINK-24208 > URL: https://issues.apache.org/jira/browse/FLINK-24208 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / REST >Reporter: Robert Metzger >Priority: Major > > As a user of Flink, I want to be able to trigger a savepoint from an external > system in a way that I can detect if I have requested this savepoint already. > By passing a custom ID to the savepoint request, I can check (in case of an > error of the original request, or the external system) if the request has > been made already. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24275) Allow idempotent job cancellation
[ https://issues.apache.org/jira/browse/FLINK-24275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417720#comment-17417720 ] Austin Cawley-Edwards commented on FLINK-24275: --- I see two options: * Treat the endpoint imperatively ("Switch this Job to Cancelled") ** Fail on already cancelled (i.e. 409) * Treat the endpoint declaratively ("Ensure this Job is Cancelled") ** Return a success message (i.e., 200, or 202 like is currently returned) You know which fits better with the Flink API concepts than I do, but I would lean towards the second option to simplify how users handle it. If duplicate calls return an error, users have to classify this error as acceptable. I don't see any case where a 409 error is handled any differently than a success. > Allow idempotent job cancellation > - > > Key: FLINK-24275 > URL: https://issues.apache.org/jira/browse/FLINK-24275 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Austin Cawley-Edwards >Priority: Major > > As a user of Flink, I want to be able to cancel a job from an external system > in a fault-tolerant way without guessing if the job has already been > cancelled. > > Currently, the cancel endpoint (PATCH /jobs/:jobid?mode=cancel) will return a > 404 if the job is already cancelled. This makes it hard to detect if the job > truly doesn't exist, or if it is already in the desired state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-24208) Allow idempotent savepoint triggering
[ https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417718#comment-17417718 ] Austin Cawley-Edwards edited comment on FLINK-24208 at 9/20/21, 4:24 PM: - [~chesnay], definitely on the same page that the user provides a specific ID. Maybe I did not understand the sketch, but from the current description I would assume sending the same `triggerId` if one is already exists would result in an error (like 409 Conflict), so it can be detected as a duplicate on the user side. I think what we rather want to say (and what I thought the sketch implements) is that duplicate requests result in the same success response (i.e., 200 w/ a `TriggerResponse`). It does not matter to the user that another request with the same `triggerId` has already been sent. I would suggest updating the description to: As a user of Flink, I want to be able to trigger a savepoint from an external system in a way that is safe to retry so I can guard against ephemeral errors, like network failures, and simplify my upgrade routines. By passing a custom ID to the savepoint request, I can let Flink deduplicate the requests and not worry about storing the result or sending the request again. was (Author: austince): [~chesnay], definitely on the same page that the user provides a specific ID. Maybe I did not understand the sketch, but from the current description I would assume sending the same `triggerId` if one is already in progress would result in an error (like 409 Conflict) so it can be detected as a duplicate. I think what we rather want to say (and what I thought the sketch implements) is that duplicate requests result in the same success response (i.e., 200 w/ a `TriggerResponse`). It does not matter to the user that another request with the same `triggerId` has already been sent. I would suggest updating the description to: As a user of Flink, I want to be able to trigger a savepoint from an external system in a way that is safe to retry so I can guard against ephemeral errors, like network failures, and simplify my upgrade routines. By passing a custom ID to the savepoint request, I can let Flink deduplicate the requests and not worry about storing the result or sending the request again. > Allow idempotent savepoint triggering > - > > Key: FLINK-24208 > URL: https://issues.apache.org/jira/browse/FLINK-24208 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / REST >Reporter: Robert Metzger >Priority: Major > > As a user of Flink, I want to be able to trigger a savepoint from an external > system in a way that I can detect if I have requested this savepoint already. > By passing a custom ID to the savepoint request, I can check (in case of an > error of the original request, or the external system) if the request has > been made already. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24208) Allow idempotent savepoint triggering
[ https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417718#comment-17417718 ] Austin Cawley-Edwards commented on FLINK-24208: --- [~chesnay], definitely on the same page that the user provides a specific ID. Maybe I did not understand the sketch, but from the current description I would assume sending the same `triggerId` if one is already in progress would result in an error (like 409 Conflict) so it can be detected as a duplicate. I think what we rather want to say (and what I thought the sketch implements) is that duplicate requests result in the same success response (i.e., 200 w/ a `TriggerResponse`). It does not matter to the user that another request with the same `triggerId` has already been sent. I would suggest updating the description to: As a user of Flink, I want to be able to trigger a savepoint from an external system in a way that is safe to retry so I can guard against ephemeral errors, like network failures, and simplify my upgrade routines. By passing a custom ID to the savepoint request, I can let Flink deduplicate the requests and not worry about storing the result or sending the request again. > Allow idempotent savepoint triggering > - > > Key: FLINK-24208 > URL: https://issues.apache.org/jira/browse/FLINK-24208 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / REST >Reporter: Robert Metzger >Priority: Major > > As a user of Flink, I want to be able to trigger a savepoint from an external > system in a way that I can detect if I have requested this savepoint already. > By passing a custom ID to the savepoint request, I can check (in case of an > error of the original request, or the external system) if the request has > been made already. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24275) Allow idempotent job cancellation
Austin Cawley-Edwards created FLINK-24275: - Summary: Allow idempotent job cancellation Key: FLINK-24275 URL: https://issues.apache.org/jira/browse/FLINK-24275 Project: Flink Issue Type: Improvement Components: Runtime / REST Reporter: Austin Cawley-Edwards As a user of Flink, I want to be able to cancel a job from an external system in a fault-tolerant way without guessing if the job has already been cancelled. Currently, the cancel endpoint (PATCH /jobs/:jobid?mode=cancel) will return a 404 if the job is already cancelled. This makes it hard to detect if the job truly doesn't exist, or if it is already in the desired state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24208) Allow idempotent savepoint triggering
[ https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17414559#comment-17414559 ] Austin Cawley-Edwards commented on FLINK-24208: --- The sketch looks great [~chesnay] ! We may want to clarify the description to say that this is really about safely requesting a savepoint from an external system – the user should not care (or know) if a savepoint with the trigger ID has already been requested (which seems to be how you've implemented it :)). > Allow idempotent savepoint triggering > - > > Key: FLINK-24208 > URL: https://issues.apache.org/jira/browse/FLINK-24208 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / REST >Reporter: Robert Metzger >Priority: Major > > As a user of Flink, I want to be able to trigger a savepoint from an external > system in a way that I can detect if I have requested this savepoint already. > By passing a custom ID to the savepoint request, I can check (in case of an > error of the original request, or the external system) if the request has > been made already. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-24208) Allow idempotent savepoint triggering
[ https://issues.apache.org/jira/browse/FLINK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-24208: -- Component/s: Runtime / REST > Allow idempotent savepoint triggering > - > > Key: FLINK-24208 > URL: https://issues.apache.org/jira/browse/FLINK-24208 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / REST >Reporter: Robert Metzger >Priority: Major > > As a user of Flink, I want to be able to trigger a savepoint from an external > system in a way that I can detect if I have requested this savepoint already. > By passing a custom ID to the savepoint request, I can check (in case of an > error of the original request, or the external system) if the request has > been made already. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20244) RMQSource does not ACK duplicated messages
[ https://issues.apache.org/jira/browse/FLINK-20244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367701#comment-17367701 ] Austin Cawley-Edwards commented on FLINK-20244: --- Thanks for the ping [~cmick] + follow-up on this ticket. The issues you've decomposed make sense to me – as for splitting them up, I think it depends on how big the changes are if they can fit into a single PR. Multiple tickets + commits seem reasonable though. I'll pass this off to [~fabian.paul] and [~arvid] though, as they're working more in this area and will know the procedures better than I. Anyway, thanks for your continued contributions :) > RMQSource does not ACK duplicated messages > -- > > Key: FLINK-20244 > URL: https://issues.apache.org/jira/browse/FLINK-20244 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.11.2, 1.12.0 >Reporter: Thomas Eckestad >Priority: Minor > Labels: auto-deprioritized-major > > *Background* > The following has been observed when using RMQSource with > exactly-once-guarantees. > When a job is restarted from a checkpoint, and there were unacked messages > referenced by that checkpoint at time of the restart, those messages will be > requeued by RMQ and resent to the restarted job. But they will not be acked. > Later when the connection to RMQ is closed (the job either finishes or is > restarted) they will be requeued again. > When looking at the source code, messages are ACK:ed by the RMQSource after a > checkpoint is complete > (MessageAcknowledgingSourceBase::notifyCheckpointComplete). > Also, when looking at the source code in RMQSource::setMessageIdentifier() > (on the master branch, the ACK semantics does not seem to have changed since > 1.11.2) it is clear that if a RMQ message carries a correlation ID which has > already been handled, that message is skipped and not further processed. It > is also clear that skipped messages are not added to the sessionIds-list of > messages that are targeted for ACK to RMQ. > I believe all successfully consumed RMQ messages should be ACK:ed, it is > irrelevant if the message is ignored or processed by Flink. RMQ needs to know > that the consumer considers the message as handled OK. > The following code is from RMQSource::setMessageIdentifier(). Note the return > before sessionIds.add(): > . > . > . > if (!addId(correlationId)) > { // we have already processed this message return false; } > } > sessionIds.add(deliveryTag); > . > . > . > Directly related to the above I also noticed that RMQ connections are leaked > at internal job restart. From the Flink log (this stack trace is from 1.11.2): > 2020-11-18 10:08:25,118 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during > disposal of stream operator. > com.rabbitmq.client.AlreadyClosedException: connection is already closed due > to connection error; protocol method: > #method(reply-code=320, reply-text=CONNECTION_FORCED - > Closed via management plugin, class-id=0, method-id=0) > at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at > com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482) > > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at > org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192) > > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > AlreadyClosedException is not handled by the RMQSource::close(). This results > in a RMQ connection thread somewhere being left behind. I triggered three > restarts of the job in a row and noticed one new connection added to the pile > of connections for each restart. I triggered the restart by killing the > active connection to RMQ using the RMQ admin GUI (management plugin, see > above exception details). > I also tried to kill one of the leaked connections. But a new one is > instantly created when doing so. The traceback when doing this (1.11.2): > 2020-11-18 10:27:51,715 ERROR > com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An unexpected > connection driver error occured > java.lang.NoClassDefFoundError: > com/rabbitmq/client/AMQP$Connection$CloseOk$Builder > at >
[jira] [Comment Edited] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue
[ https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351899#comment-17351899 ] Austin Cawley-Edwards edited comment on FLINK-22698 at 5/26/21, 4:23 PM: - [~nicholasjiang] – yes, I agree w/ Michał that this is something that should be considered w/ the FLIP-27 source, if it has not already been. [~cmick] Thanks for the research into other sources, the approach still sounds good to me, including the user-facing configuration option. I'm not sure if there is precedent for it, but we could potentially introduce the fix in 1.14 with a reasonable timeout default (like the suggested 30s), and use the 0s default when backporting to 1.12 + 1.13 to maintain functionality. What do you think of that? Would it be more confusing to users? was (Author: austince): [~nicholasjiang] – yes, I agree w/ Michał that this is something that should be considered w/ the FLIP-27 source, if it has not already been. [~cmick] Thanks for the research into other sources, the approach still sounds good to me as well as the user-facing configuration option. I'm not sure if there is precedent for it, but we could potentially introduce the fix in 1.14 with a reasonable timeout default (like the suggested 30s), and use the 0s default when backporting to 1.12 + 1.13 to maintain functionality. What do you think of that? Would it be more confusing to users? > RabbitMQ source does not stop unless message arrives in queue > - > > Key: FLINK-22698 > URL: https://issues.apache.org/jira/browse/FLINK-22698 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.12.0 >Reporter: Austin Cawley-Edwards >Assignee: Michał Ciesielczyk >Priority: Major > Attachments: taskmanager_thread_dump.json > > > In a streaming job with multiple RMQSources, a stop-with-savepoint request > has unexpected behavior. Regular checkpoints and savepoints complete > successfully, it is only the stop-with-savepoint request where this behavior > is seen. > > *Expected Behavior:* > The stop-with-savepoint request stops the job with a FINISHED state. > > *Actual Behavior:* > The stop-with-savepoint request either times out or hangs indefinitely unless > a message arrives in all the queues that the job consumes from after the > stop-with-savepoint request is made. > > *Current workaround:* > Send a sentinel value to each of the queues consumed by the job that the > deserialization schema checks in its isEndOfStream method. This is cumbersome > and makes it difficult to do stateful upgrades, as coordination with another > system is now necessary. > > > The TaskManager thread dump is attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue
[ https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351899#comment-17351899 ] Austin Cawley-Edwards commented on FLINK-22698: --- [~nicholasjiang] – yes, I agree w/ Michał that this is something that should be considered w/ the FLIP-27 source, if it has not already been. [~cmick] Thanks for the research into other sources, the approach still sounds good to me as well as the user-facing configuration option. I'm not sure if there is precedent for it, but we could potentially introduce the fix in 1.14 with a reasonable timeout default (like the suggested 30s), and use the 0s default when backporting to 1.12 + 1.13 to maintain functionality. What do you think of that? Would it be more confusing to users? > RabbitMQ source does not stop unless message arrives in queue > - > > Key: FLINK-22698 > URL: https://issues.apache.org/jira/browse/FLINK-22698 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.12.0 >Reporter: Austin Cawley-Edwards >Assignee: Michał Ciesielczyk >Priority: Major > Attachments: taskmanager_thread_dump.json > > > In a streaming job with multiple RMQSources, a stop-with-savepoint request > has unexpected behavior. Regular checkpoints and savepoints complete > successfully, it is only the stop-with-savepoint request where this behavior > is seen. > > *Expected Behavior:* > The stop-with-savepoint request stops the job with a FINISHED state. > > *Actual Behavior:* > The stop-with-savepoint request either times out or hangs indefinitely unless > a message arrives in all the queues that the job consumes from after the > stop-with-savepoint request is made. > > *Current workaround:* > Send a sentinel value to each of the queues consumed by the job that the > deserialization schema checks in its isEndOfStream method. This is cumbersome > and makes it difficult to do stateful upgrades, as coordination with another > system is now necessary. > > > The TaskManager thread dump is attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue
[ https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards reassigned FLINK-22698: - Assignee: Michał Ciesielczyk > RabbitMQ source does not stop unless message arrives in queue > - > > Key: FLINK-22698 > URL: https://issues.apache.org/jira/browse/FLINK-22698 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.12.0 >Reporter: Austin Cawley-Edwards >Assignee: Michał Ciesielczyk >Priority: Major > Attachments: taskmanager_thread_dump.json > > > In a streaming job with multiple RMQSources, a stop-with-savepoint request > has unexpected behavior. Regular checkpoints and savepoints complete > successfully, it is only the stop-with-savepoint request where this behavior > is seen. > > *Expected Behavior:* > The stop-with-savepoint request stops the job with a FINISHED state. > > *Actual Behavior:* > The stop-with-savepoint request either times out or hangs indefinitely unless > a message arrives in all the queues that the job consumes from after the > stop-with-savepoint request is made. > > *Current workaround:* > Send a sentinel value to each of the queues consumed by the job that the > deserialization schema checks in its isEndOfStream method. This is cumbersome > and makes it difficult to do stateful upgrades, as coordination with another > system is now necessary. > > > The TaskManager thread dump is attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue
[ https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17349300#comment-17349300 ] Austin Cawley-Edwards commented on FLINK-22698: --- Hi [~cmick] – that cause makes perfect sense to me. And the proposed solution also sounds good, just a couple of questions: * What happens when a delivery times out? * How do other sources handle this, i.e. Kafka? Do they wait for messages on another thread than where the source's `run` is called? Anyway, I'll assign it to you + thanks for looking into this! Also, I'm going to cc' [~fabian.paul], as he's been working with this connector (+ others) and its FLIP-27 upgrades. He might have some other ideas. > RabbitMQ source does not stop unless message arrives in queue > - > > Key: FLINK-22698 > URL: https://issues.apache.org/jira/browse/FLINK-22698 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.12.0 >Reporter: Austin Cawley-Edwards >Priority: Major > Attachments: taskmanager_thread_dump.json > > > In a streaming job with multiple RMQSources, a stop-with-savepoint request > has unexpected behavior. Regular checkpoints and savepoints complete > successfully, it is only the stop-with-savepoint request where this behavior > is seen. > > *Expected Behavior:* > The stop-with-savepoint request stops the job with a FINISHED state. > > *Actual Behavior:* > The stop-with-savepoint request either times out or hangs indefinitely unless > a message arrives in all the queues that the job consumes from after the > stop-with-savepoint request is made. > > *Current workaround:* > Send a sentinel value to each of the queues consumed by the job that the > deserialization schema checks in its isEndOfStream method. This is cumbersome > and makes it difficult to do stateful upgrades, as coordination with another > system is now necessary. > > > The TaskManager thread dump is attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue
[ https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347771#comment-17347771 ] Austin Cawley-Edwards edited comment on FLINK-22698 at 5/19/21, 4:48 PM: - Hey [~nicholasjiang] – I think that is correct. I'm coming from [this ML thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html] and am trying to get some more information from the OP. was (Author: austince): Hey [~nicholasjiang] – I think that is correct. I'm coming from [this ML thread|[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html],]] and am trying to get some more information from the OP. > RabbitMQ source does not stop unless message arrives in queue > - > > Key: FLINK-22698 > URL: https://issues.apache.org/jira/browse/FLINK-22698 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.12.0 >Reporter: Austin Cawley-Edwards >Priority: Major > Attachments: taskmanager_thread_dump.json > > > In a streaming job with multiple RMQSources, a stop-with-savepoint request > has unexpected behavior. Regular checkpoints and savepoints complete > successfully, it is only the stop-with-savepoint request where this behavior > is seen. > > *Expected Behavior:* > The stop-with-savepoint request stops the job with a FINISHED state. > > *Actual Behavior:* > The stop-with-savepoint request either times out or hangs indefinitely unless > a message arrives in all the queues that the job consumes from after the > stop-with-savepoint request is made. > > *Current workaround:* > Send a sentinel value to each of the queues consumed by the job that the > deserialization schema checks in its isEndOfStream method. This is cumbersome > and makes it difficult to do stateful upgrades, as coordination with another > system is now necessary. > > > The TaskManager thread dump is attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue
[ https://issues.apache.org/jira/browse/FLINK-22698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347771#comment-17347771 ] Austin Cawley-Edwards commented on FLINK-22698: --- Hey [~nicholasjiang] – I think that is correct. I'm coming from [this ML thread|[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RabbitMQ-source-does-not-stop-unless-message-arrives-in-queue-td43705.html],]] and am trying to get some more information from the OP. > RabbitMQ source does not stop unless message arrives in queue > - > > Key: FLINK-22698 > URL: https://issues.apache.org/jira/browse/FLINK-22698 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.12.0 >Reporter: Austin Cawley-Edwards >Priority: Major > Attachments: taskmanager_thread_dump.json > > > In a streaming job with multiple RMQSources, a stop-with-savepoint request > has unexpected behavior. Regular checkpoints and savepoints complete > successfully, it is only the stop-with-savepoint request where this behavior > is seen. > > *Expected Behavior:* > The stop-with-savepoint request stops the job with a FINISHED state. > > *Actual Behavior:* > The stop-with-savepoint request either times out or hangs indefinitely unless > a message arrives in all the queues that the job consumes from after the > stop-with-savepoint request is made. > > *Current workaround:* > Send a sentinel value to each of the queues consumed by the job that the > deserialization schema checks in its isEndOfStream method. This is cumbersome > and makes it difficult to do stateful upgrades, as coordination with another > system is now necessary. > > > The TaskManager thread dump is attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22698) RabbitMQ source does not stop unless message arrives in queue
Austin Cawley-Edwards created FLINK-22698: - Summary: RabbitMQ source does not stop unless message arrives in queue Key: FLINK-22698 URL: https://issues.apache.org/jira/browse/FLINK-22698 Project: Flink Issue Type: Bug Components: Connectors/ RabbitMQ Affects Versions: 1.12.0 Reporter: Austin Cawley-Edwards Attachments: taskmanager_thread_dump.json In a streaming job with multiple RMQSources, a stop-with-savepoint request has unexpected behavior. Regular checkpoints and savepoints complete successfully, it is only the stop-with-savepoint request where this behavior is seen. *Expected Behavior:* The stop-with-savepoint request stops the job with a FINISHED state. *Actual Behavior:* The stop-with-savepoint request either times out or hangs indefinitely unless a message arrives in all the queues that the job consumes from after the stop-with-savepoint request is made. *Current workaround:* Send a sentinel value to each of the queues consumed by the job that the deserialization schema checks in its isEndOfStream method. This is cumbersome and makes it difficult to do stateful upgrades, as coordination with another system is now necessary. The TaskManager thread dump is attached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22441) In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There are many vulnerabilities, like CVE-2021-21409 etc. please confirm these version and fix.
[ https://issues.apache.org/jira/browse/FLINK-22441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332718#comment-17332718 ] Austin Cawley-Edwards commented on FLINK-22441: --- Adding the "drop Scala 2.11 support" ticket as a blocker, as Konstantin mentioned the current Akka lib is the root of this issue, and Akka cannot be upgraded until Scala 2.11 is dropped. If there's a more accurate ticket for the Akka upgrade, we can update the blocker. > In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There > are many vulnerabilities, like CVE-2021-21409 etc. please confirm these > version and fix. thx > -- > > Key: FLINK-22441 > URL: https://issues.apache.org/jira/browse/FLINK-22441 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.11.3, 1.12.2, 1.13.0 >Reporter: 张健 >Priority: Major > > In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There > are many vulnerabilities, like CVE-2021-21409 CVE-2021-21295 etc. please > confirm these version and fix. thx -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19711) Parsing of invalid CSV fails even with ignoreParseErrors is enabled
[ https://issues.apache.org/jira/browse/FLINK-19711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324629#comment-17324629 ] Austin Cawley-Edwards commented on FLINK-19711: --- I had difficulty reproducing this in tests, though believe it is still an issue. Not actively working on this, so if it does affect anyone and they want to work on it, by all means I assign me. > Parsing of invalid CSV fails even with ignoreParseErrors is enabled > --- > > Key: FLINK-19711 > URL: https://issues.apache.org/jira/browse/FLINK-19711 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.10.2, 1.11.2, 1.12.0 >Reporter: Roman Khachatryan >Assignee: Austin Cawley-Edwards >Priority: Major > Labels: stale-assigned > > Reported in > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Un-ignored-Parsing-Exceptions-in-the-CsvFormat-td38781.html] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-21538) Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job
[ https://issues.apache.org/jira/browse/FLINK-21538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards reassigned FLINK-21538: - Assignee: (was: Austin Cawley-Edwards) > Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job > -- > > Key: FLINK-21538 > URL: https://issues.apache.org/jira/browse/FLINK-21538 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch, Runtime / Coordination >Affects Versions: 1.12.1, 1.13.0 >Reporter: Dawid Wysakowicz >Priority: Major > Labels: stale-assigned, test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13868=logs=3d12d40f-c62d-5ec4-6acc-0efe94cc3e89=5d6e4255-0ea8-5e2a-f52c-c881b7872361 > {code} > 2021-02-27T00:16:06.9493539Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2021-02-27T00:16:06.9494494Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2021-02-27T00:16:06.9495733Z at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) > 2021-02-27T00:16:06.9496596Z at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > 2021-02-27T00:16:06.9497354Z at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > 2021-02-27T00:16:06.9525795Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-02-27T00:16:06.9526744Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-02-27T00:16:06.9527784Z at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) > 2021-02-27T00:16:06.9528552Z at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > 2021-02-27T00:16:06.9529271Z at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > 2021-02-27T00:16:06.9530013Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-02-27T00:16:06.9530482Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-02-27T00:16:06.9531068Z at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) > 2021-02-27T00:16:06.9531544Z at > akka.dispatch.OnComplete.internal(Future.scala:264) > 2021-02-27T00:16:06.9531908Z at > akka.dispatch.OnComplete.internal(Future.scala:261) > 2021-02-27T00:16:06.9532449Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > 2021-02-27T00:16:06.9532860Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > 2021-02-27T00:16:06.9533245Z at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > 2021-02-27T00:16:06.9533721Z at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) > 2021-02-27T00:16:06.9534225Z at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > 2021-02-27T00:16:06.9534697Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > 2021-02-27T00:16:06.9535217Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > 2021-02-27T00:16:06.9535718Z at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > 2021-02-27T00:16:06.9536127Z at > akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) > 2021-02-27T00:16:06.9536861Z at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > 2021-02-27T00:16:06.9537394Z at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > 2021-02-27T00:16:06.9537916Z at > scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) > 2021-02-27T00:16:06.9605804Z at > scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) > 2021-02-27T00:16:06.9606794Z at > scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) > 2021-02-27T00:16:06.9607642Z at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > 2021-02-27T00:16:06.9608419Z at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > 2021-02-27T00:16:06.9609252Z at > akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) > 2021-02-27T00:16:06.9610024Z at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > 2021-02-27T00:16:06.9613676Z at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) > 2021-02-27T00:16:06.9615526Z at >
[jira] [Closed] (FLINK-22165) How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv
[ https://issues.apache.org/jira/browse/FLINK-22165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards closed FLINK-22165. - Resolution: Invalid > How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv > --- > > Key: FLINK-22165 > URL: https://issues.apache.org/jira/browse/FLINK-22165 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.12.2 > Environment: Flink 1.12.2 > rabbitmq 3.8.4 >Reporter: Spongebob >Priority: Minor > > Flink rabbitmq module provides source and sink function for rabbitmq. We can > use the correlationId to deduplicate the checkpoints record, So can we set a > correlationId for each message to sink into rabbitmq ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22165) How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv
[ https://issues.apache.org/jira/browse/FLINK-22165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17317576#comment-17317576 ] Austin Cawley-Edwards commented on FLINK-22165: --- Hi there! Please send this as a question to u...@flink.apache.org and I'm happy to help :) > How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv > --- > > Key: FLINK-22165 > URL: https://issues.apache.org/jira/browse/FLINK-22165 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.12.2 > Environment: Flink 1.12.2 > rabbitmq 3.8.4 >Reporter: Spongebob >Priority: Minor > > Flink rabbitmq module provides source and sink function for rabbitmq. We can > use the correlationId to deduplicate the checkpoints record, So can we set a > correlationId for each message to sink into rabbitmq ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17859) [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version
[ https://issues.apache.org/jira/browse/FLINK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-17859: -- Fix Version/s: 1.8.0 > [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version > > > Key: FLINK-17859 > URL: https://issues.apache.org/jira/browse/FLINK-17859 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem, Connectors / Hive >Affects Versions: 1.7.2 >Reporter: sam lin >Priority: Major > Fix For: 1.8.0 > > > The current version of presto-hive is 0.187.[ > |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > > [https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > [ > |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > The latest version is 0.234. [https://github.com/prestodb/presto/releases > ] > There are some nice features we want to use after 0.187. One of them is the > CredentialProviderChain support when using AWS S3 client added in this > [pr]([https://github.com/prestodb/presto/pull/13858]) > Do you have any concerns to upgrade the `presto-hive` to the latest version? > Could you please upgrade it in the latest release? Thanks. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17859) [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version
[ https://issues.apache.org/jira/browse/FLINK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-17859: -- Affects Version/s: 1.7.2 > [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version > > > Key: FLINK-17859 > URL: https://issues.apache.org/jira/browse/FLINK-17859 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem, Connectors / Hive >Affects Versions: 1.7.2 >Reporter: sam lin >Priority: Major > > The current version of presto-hive is 0.187.[ > |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > > [https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > [ > |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > The latest version is 0.234. [https://github.com/prestodb/presto/releases > ] > There are some nice features we want to use after 0.187. One of them is the > CredentialProviderChain support when using AWS S3 client added in this > [pr]([https://github.com/prestodb/presto/pull/13858]) > Do you have any concerns to upgrade the `presto-hive` to the latest version? > Could you please upgrade it in the latest release? Thanks. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17859) [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version
[ https://issues.apache.org/jira/browse/FLINK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17315886#comment-17315886 ] Austin Cawley-Edwards commented on FLINK-17859: --- This has been updated as of Flink 1.8: [https://github.com/apache/flink/blob/release-1.8/flink-filesystems/flink-s3-fs-presto/pom.xml#L36|https://github.com/apache/flink/blob/9ff3b0ee294222007c6a6af57541c48854f6645a/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] Ticket should be closed. > [flink-s3-fs-presto] Upgrade the dependent presto-hive jar to latest version > > > Key: FLINK-17859 > URL: https://issues.apache.org/jira/browse/FLINK-17859 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem, Connectors / Hive >Reporter: sam lin >Priority: Major > > The current version of presto-hive is 0.187.[ > |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > > [https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > [ > |https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > The latest version is 0.234. [https://github.com/prestodb/presto/releases > ] > There are some nice features we want to use after 0.187. One of them is the > CredentialProviderChain support when using AWS S3 client added in this > [pr]([https://github.com/prestodb/presto/pull/13858]) > Do you have any concerns to upgrade the `presto-hive` to the latest version? > Could you please upgrade it in the latest release? Thanks. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22084) RescalingITCase fails with adaptive scheduler
[ https://issues.apache.org/jira/browse/FLINK-22084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17315716#comment-17315716 ] Austin Cawley-Edwards commented on FLINK-22084: --- That is unfortunate, and yes I agree that it seems like it should not be supported. I guess this was supported before by allowing autoconfigured max parallelism to be overridden by the state restore. I believe this is broken because we first compute the "initialParallelismStore", which defaults/ records if it has been autoconfigured, and then use that to create an "adjusted" parallelism store when creating the execution graph *without transferring autoconfiguration details*, thus hiding if a vertex's max parallelism can be rescaled by the state. > RescalingITCase fails with adaptive scheduler > - > > Key: FLINK-22084 > URL: https://issues.apache.org/jira/browse/FLINK-22084 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Dawid Wysakowicz >Assignee: Austin Cawley-Edwards >Priority: Blocker > Labels: test-stability > Fix For: 1.13.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15934=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=a0a633b8-47ef-5c5a-2806-3c13b9e48228=4472 > {code} > 2021-03-31T22:16:07.8416407Z [ERROR] > testSavepointRescalingOutKeyedStateDerivedMaxParallelism[backend = > rocksdb](org.apache.flink.test.checkpointing.RescalingITCase) Time elapsed: > 9.945 s <<< ERROR! > 2021-03-31T22:16:07.8417534Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2021-03-31T22:16:07.8418516Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2021-03-31T22:16:07.8419281Z at > org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:63) > 2021-03-31T22:16:07.8420142Z at > org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingKeyedState(RescalingITCase.java:251) > 2021-03-31T22:16:07.8421173Z at > org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingOutKeyedStateDerivedMaxParallelism(RescalingITCase.java:168) > 2021-03-31T22:16:07.8421985Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2021-03-31T22:16:07.8422651Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2021-03-31T22:16:07.8423649Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2021-03-31T22:16:07.8424231Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2021-03-31T22:16:07.8424657Z at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2021-03-31T22:16:07.8425147Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2021-03-31T22:16:07.8425609Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2021-03-31T22:16:07.8426183Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2021-03-31T22:16:07.8569060Z at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2021-03-31T22:16:07.8569781Z at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2021-03-31T22:16:07.8570451Z at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > 2021-03-31T22:16:07.8571040Z at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2021-03-31T22:16:07.8571604Z at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > 2021-03-31T22:16:07.8572303Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > 2021-03-31T22:16:07.8573259Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > 2021-03-31T22:16:07.8573975Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2021-03-31T22:16:07.8574660Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2021-03-31T22:16:07.8575359Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2021-03-31T22:16:07.8576037Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2021-03-31T22:16:07.8576728Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2021-03-31T22:16:07.8577588Z at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > 2021-03-31T22:16:07.8578181Z at > org.junit.runners.Suite.runChild(Suite.java:128) > 2021-03-31T22:16:07.8578771Z at > org.junit.runners.Suite.runChild(Suite.java:27) > 2021-03-31T22:16:07.8579402Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >
[jira] [Commented] (FLINK-22084) RescalingITCase fails with adaptive scheduler
[ https://issues.apache.org/jira/browse/FLINK-22084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17315603#comment-17315603 ] Austin Cawley-Edwards commented on FLINK-22084: --- [~trohrmann] very likely – I'll take this. > RescalingITCase fails with adaptive scheduler > - > > Key: FLINK-22084 > URL: https://issues.apache.org/jira/browse/FLINK-22084 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Dawid Wysakowicz >Priority: Blocker > Labels: test-stability > Fix For: 1.13.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15934=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=a0a633b8-47ef-5c5a-2806-3c13b9e48228=4472 > {code} > 2021-03-31T22:16:07.8416407Z [ERROR] > testSavepointRescalingOutKeyedStateDerivedMaxParallelism[backend = > rocksdb](org.apache.flink.test.checkpointing.RescalingITCase) Time elapsed: > 9.945 s <<< ERROR! > 2021-03-31T22:16:07.8417534Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2021-03-31T22:16:07.8418516Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2021-03-31T22:16:07.8419281Z at > org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:63) > 2021-03-31T22:16:07.8420142Z at > org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingKeyedState(RescalingITCase.java:251) > 2021-03-31T22:16:07.8421173Z at > org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingOutKeyedStateDerivedMaxParallelism(RescalingITCase.java:168) > 2021-03-31T22:16:07.8421985Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2021-03-31T22:16:07.8422651Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2021-03-31T22:16:07.8423649Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2021-03-31T22:16:07.8424231Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2021-03-31T22:16:07.8424657Z at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2021-03-31T22:16:07.8425147Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2021-03-31T22:16:07.8425609Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2021-03-31T22:16:07.8426183Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2021-03-31T22:16:07.8569060Z at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2021-03-31T22:16:07.8569781Z at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2021-03-31T22:16:07.8570451Z at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > 2021-03-31T22:16:07.8571040Z at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2021-03-31T22:16:07.8571604Z at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > 2021-03-31T22:16:07.8572303Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > 2021-03-31T22:16:07.8573259Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > 2021-03-31T22:16:07.8573975Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2021-03-31T22:16:07.8574660Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2021-03-31T22:16:07.8575359Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2021-03-31T22:16:07.8576037Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2021-03-31T22:16:07.8576728Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2021-03-31T22:16:07.8577588Z at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > 2021-03-31T22:16:07.8578181Z at > org.junit.runners.Suite.runChild(Suite.java:128) > 2021-03-31T22:16:07.8578771Z at > org.junit.runners.Suite.runChild(Suite.java:27) > 2021-03-31T22:16:07.8579402Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2021-03-31T22:16:07.8580061Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2021-03-31T22:16:07.8580774Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2021-03-31T22:16:07.8581480Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2021-03-31T22:16:07.8582148Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2021-03-31T22:16:07.8582896Z at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2021-03-31T22:16:07.8583762Z at >
[jira] [Assigned] (FLINK-22084) RescalingITCase fails with adaptive scheduler
[ https://issues.apache.org/jira/browse/FLINK-22084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards reassigned FLINK-22084: - Assignee: Austin Cawley-Edwards > RescalingITCase fails with adaptive scheduler > - > > Key: FLINK-22084 > URL: https://issues.apache.org/jira/browse/FLINK-22084 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.13.0 >Reporter: Dawid Wysakowicz >Assignee: Austin Cawley-Edwards >Priority: Blocker > Labels: test-stability > Fix For: 1.13.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15934=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=a0a633b8-47ef-5c5a-2806-3c13b9e48228=4472 > {code} > 2021-03-31T22:16:07.8416407Z [ERROR] > testSavepointRescalingOutKeyedStateDerivedMaxParallelism[backend = > rocksdb](org.apache.flink.test.checkpointing.RescalingITCase) Time elapsed: > 9.945 s <<< ERROR! > 2021-03-31T22:16:07.8417534Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2021-03-31T22:16:07.8418516Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2021-03-31T22:16:07.8419281Z at > org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:63) > 2021-03-31T22:16:07.8420142Z at > org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingKeyedState(RescalingITCase.java:251) > 2021-03-31T22:16:07.8421173Z at > org.apache.flink.test.checkpointing.RescalingITCase.testSavepointRescalingOutKeyedStateDerivedMaxParallelism(RescalingITCase.java:168) > 2021-03-31T22:16:07.8421985Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2021-03-31T22:16:07.8422651Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2021-03-31T22:16:07.8423649Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2021-03-31T22:16:07.8424231Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2021-03-31T22:16:07.8424657Z at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2021-03-31T22:16:07.8425147Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2021-03-31T22:16:07.8425609Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2021-03-31T22:16:07.8426183Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2021-03-31T22:16:07.8569060Z at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2021-03-31T22:16:07.8569781Z at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2021-03-31T22:16:07.8570451Z at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > 2021-03-31T22:16:07.8571040Z at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2021-03-31T22:16:07.8571604Z at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > 2021-03-31T22:16:07.8572303Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > 2021-03-31T22:16:07.8573259Z at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > 2021-03-31T22:16:07.8573975Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2021-03-31T22:16:07.8574660Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2021-03-31T22:16:07.8575359Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2021-03-31T22:16:07.8576037Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2021-03-31T22:16:07.8576728Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2021-03-31T22:16:07.8577588Z at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > 2021-03-31T22:16:07.8578181Z at > org.junit.runners.Suite.runChild(Suite.java:128) > 2021-03-31T22:16:07.8578771Z at > org.junit.runners.Suite.runChild(Suite.java:27) > 2021-03-31T22:16:07.8579402Z at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2021-03-31T22:16:07.8580061Z at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2021-03-31T22:16:07.8580774Z at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2021-03-31T22:16:07.8581480Z at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2021-03-31T22:16:07.8582148Z at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2021-03-31T22:16:07.8582896Z at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2021-03-31T22:16:07.8583762Z at >
[jira] [Commented] (FLINK-21844) Do not auto-configure maxParallelism when setting "scheduler-mode: reactive"
[ https://issues.apache.org/jira/browse/FLINK-21844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305213#comment-17305213 ] Austin Cawley-Edwards commented on FLINK-21844: --- Does this mean a user must configure `maxParallelism` for all operators in reactive mode, otherwise Flink will produce an exception when building the `ExecutionGraph`? > Do not auto-configure maxParallelism when setting "scheduler-mode: reactive" > > > Key: FLINK-21844 > URL: https://issues.apache.org/jira/browse/FLINK-21844 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Konstantin Knauf >Assignee: Austin Cawley-Edwards >Priority: Major > Fix For: 1.13.0 > > > I believe we should not automatically change the maxParallelism when the > "scheduler-mode" is set to "reactive", because: > * it magically breaks savepoint compatibility, when you switch between > default and reactive scheduler mode > * the maximum parallelism is an orthogonal concern that in my opinion should > not be mixed with the scheduler mode. The reactive scheduler should respect > the maxParallelism, but it should not set/ change its default value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21844) Do not auto-configure maxParallelism when setting "scheduler-mode: reactive"
[ https://issues.apache.org/jira/browse/FLINK-21844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-21844: -- Description: I believe we should not automatically change the maxParallelism when the "scheduler-mode" is set to "reactive", because: * it magically breaks savepoint compatibility, when you switch between default and reactive scheduler mode * the maximum parallelism is an orthogonal concern that in my opinion should not be mixed with the scheduler mode. The reactive scheduler should respect the maxParallelism, but it should not set/ change its default value. was: I believe we should not automatically change the maxParallelism when the "scheduler-mode" is set to "reactive", because: * it magically breaks savepoint compatibility, when you switch between default and reactive scheduler mode * the maximum parallelism is an orthogonal concern that in my opinion should not be mixed with the scheduler mode. The reactive scheduler should respect the maxParallelism, but it should not set change its default value. > Do not auto-configure maxParallelism when setting "scheduler-mode: reactive" > > > Key: FLINK-21844 > URL: https://issues.apache.org/jira/browse/FLINK-21844 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Konstantin Knauf >Assignee: Austin Cawley-Edwards >Priority: Major > Fix For: 1.13.0 > > > I believe we should not automatically change the maxParallelism when the > "scheduler-mode" is set to "reactive", because: > * it magically breaks savepoint compatibility, when you switch between > default and reactive scheduler mode > * the maximum parallelism is an orthogonal concern that in my opinion should > not be mixed with the scheduler mode. The reactive scheduler should respect > the maxParallelism, but it should not set/ change its default value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-21844) Do not auto-configure maxParallelism when setting "scheduler-mode: reactive"
[ https://issues.apache.org/jira/browse/FLINK-21844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards reassigned FLINK-21844: - Assignee: Austin Cawley-Edwards > Do not auto-configure maxParallelism when setting "scheduler-mode: reactive" > > > Key: FLINK-21844 > URL: https://issues.apache.org/jira/browse/FLINK-21844 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Konstantin Knauf >Assignee: Austin Cawley-Edwards >Priority: Major > Fix For: 1.13.0 > > > I believe we should not automatically change the maxParallelism when the > "scheduler-mode" is set to "reactive", because: > * it magically breaks savepoint compatibility, when you switch between > default and reactive scheduler mode > * the maximum parallelism is an orthogonal concern that in my opinion should > not be mixed with the scheduler mode. The reactive scheduler should respect > the maxParallelism, but it should not set change its default value. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-21538) Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job
[ https://issues.apache.org/jira/browse/FLINK-21538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300636#comment-17300636 ] Austin Cawley-Edwards edited comment on FLINK-21538 at 3/15/21, 4:13 PM: - Test debug build here: [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=14715=results] was (Author: austince): Test debug build here: https://dev.azure.com/austincawley0684/flink/_build/results?buildId=3=results > Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job > -- > > Key: FLINK-21538 > URL: https://issues.apache.org/jira/browse/FLINK-21538 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch, Runtime / Coordination >Affects Versions: 1.12.1 >Reporter: Dawid Wysakowicz >Assignee: Austin Cawley-Edwards >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13868=logs=3d12d40f-c62d-5ec4-6acc-0efe94cc3e89=5d6e4255-0ea8-5e2a-f52c-c881b7872361 > {code} > 2021-02-27T00:16:06.9493539Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2021-02-27T00:16:06.9494494Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2021-02-27T00:16:06.9495733Z at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) > 2021-02-27T00:16:06.9496596Z at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > 2021-02-27T00:16:06.9497354Z at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > 2021-02-27T00:16:06.9525795Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-02-27T00:16:06.9526744Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-02-27T00:16:06.9527784Z at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) > 2021-02-27T00:16:06.9528552Z at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > 2021-02-27T00:16:06.9529271Z at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > 2021-02-27T00:16:06.9530013Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-02-27T00:16:06.9530482Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-02-27T00:16:06.9531068Z at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) > 2021-02-27T00:16:06.9531544Z at > akka.dispatch.OnComplete.internal(Future.scala:264) > 2021-02-27T00:16:06.9531908Z at > akka.dispatch.OnComplete.internal(Future.scala:261) > 2021-02-27T00:16:06.9532449Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > 2021-02-27T00:16:06.9532860Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > 2021-02-27T00:16:06.9533245Z at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > 2021-02-27T00:16:06.9533721Z at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) > 2021-02-27T00:16:06.9534225Z at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > 2021-02-27T00:16:06.9534697Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > 2021-02-27T00:16:06.9535217Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > 2021-02-27T00:16:06.9535718Z at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > 2021-02-27T00:16:06.9536127Z at > akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) > 2021-02-27T00:16:06.9536861Z at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > 2021-02-27T00:16:06.9537394Z at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > 2021-02-27T00:16:06.9537916Z at > scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) > 2021-02-27T00:16:06.9605804Z at > scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) > 2021-02-27T00:16:06.9606794Z at > scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) > 2021-02-27T00:16:06.9607642Z at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > 2021-02-27T00:16:06.9608419Z at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > 2021-02-27T00:16:06.9609252Z at >
[jira] [Assigned] (FLINK-21538) Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job
[ https://issues.apache.org/jira/browse/FLINK-21538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards reassigned FLINK-21538: - Assignee: Austin Cawley-Edwards > Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job > -- > > Key: FLINK-21538 > URL: https://issues.apache.org/jira/browse/FLINK-21538 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch, Runtime / Coordination >Affects Versions: 1.12.1 >Reporter: Dawid Wysakowicz >Assignee: Austin Cawley-Edwards >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13868=logs=3d12d40f-c62d-5ec4-6acc-0efe94cc3e89=5d6e4255-0ea8-5e2a-f52c-c881b7872361 > {code} > 2021-02-27T00:16:06.9493539Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2021-02-27T00:16:06.9494494Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2021-02-27T00:16:06.9495733Z at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) > 2021-02-27T00:16:06.9496596Z at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > 2021-02-27T00:16:06.9497354Z at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > 2021-02-27T00:16:06.9525795Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-02-27T00:16:06.9526744Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-02-27T00:16:06.9527784Z at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) > 2021-02-27T00:16:06.9528552Z at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > 2021-02-27T00:16:06.9529271Z at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > 2021-02-27T00:16:06.9530013Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-02-27T00:16:06.9530482Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-02-27T00:16:06.9531068Z at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) > 2021-02-27T00:16:06.9531544Z at > akka.dispatch.OnComplete.internal(Future.scala:264) > 2021-02-27T00:16:06.9531908Z at > akka.dispatch.OnComplete.internal(Future.scala:261) > 2021-02-27T00:16:06.9532449Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > 2021-02-27T00:16:06.9532860Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > 2021-02-27T00:16:06.9533245Z at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > 2021-02-27T00:16:06.9533721Z at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) > 2021-02-27T00:16:06.9534225Z at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > 2021-02-27T00:16:06.9534697Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > 2021-02-27T00:16:06.9535217Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > 2021-02-27T00:16:06.9535718Z at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > 2021-02-27T00:16:06.9536127Z at > akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) > 2021-02-27T00:16:06.9536861Z at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > 2021-02-27T00:16:06.9537394Z at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > 2021-02-27T00:16:06.9537916Z at > scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) > 2021-02-27T00:16:06.9605804Z at > scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) > 2021-02-27T00:16:06.9606794Z at > scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) > 2021-02-27T00:16:06.9607642Z at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > 2021-02-27T00:16:06.9608419Z at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > 2021-02-27T00:16:06.9609252Z at > akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) > 2021-02-27T00:16:06.9610024Z at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > 2021-02-27T00:16:06.9613676Z at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) > 2021-02-27T00:16:06.9615526Z at >
[jira] [Commented] (FLINK-21538) Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job
[ https://issues.apache.org/jira/browse/FLINK-21538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300636#comment-17300636 ] Austin Cawley-Edwards commented on FLINK-21538: --- Test debug build here: https://dev.azure.com/austincawley0684/flink/_build/results?buildId=3=results > Elasticsearch6DynamicSinkITCase.testWritingDocuments fails when submitting job > -- > > Key: FLINK-21538 > URL: https://issues.apache.org/jira/browse/FLINK-21538 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch, Runtime / Coordination >Affects Versions: 1.12.1 >Reporter: Dawid Wysakowicz >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13868=logs=3d12d40f-c62d-5ec4-6acc-0efe94cc3e89=5d6e4255-0ea8-5e2a-f52c-c881b7872361 > {code} > 2021-02-27T00:16:06.9493539Z > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > 2021-02-27T00:16:06.9494494Z at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > 2021-02-27T00:16:06.9495733Z at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) > 2021-02-27T00:16:06.9496596Z at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > 2021-02-27T00:16:06.9497354Z at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > 2021-02-27T00:16:06.9525795Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-02-27T00:16:06.9526744Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-02-27T00:16:06.9527784Z at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) > 2021-02-27T00:16:06.9528552Z at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > 2021-02-27T00:16:06.9529271Z at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > 2021-02-27T00:16:06.9530013Z at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > 2021-02-27T00:16:06.9530482Z at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > 2021-02-27T00:16:06.9531068Z at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) > 2021-02-27T00:16:06.9531544Z at > akka.dispatch.OnComplete.internal(Future.scala:264) > 2021-02-27T00:16:06.9531908Z at > akka.dispatch.OnComplete.internal(Future.scala:261) > 2021-02-27T00:16:06.9532449Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > 2021-02-27T00:16:06.9532860Z at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > 2021-02-27T00:16:06.9533245Z at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > 2021-02-27T00:16:06.9533721Z at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) > 2021-02-27T00:16:06.9534225Z at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > 2021-02-27T00:16:06.9534697Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > 2021-02-27T00:16:06.9535217Z at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > 2021-02-27T00:16:06.9535718Z at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > 2021-02-27T00:16:06.9536127Z at > akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) > 2021-02-27T00:16:06.9536861Z at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > 2021-02-27T00:16:06.9537394Z at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > 2021-02-27T00:16:06.9537916Z at > scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) > 2021-02-27T00:16:06.9605804Z at > scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) > 2021-02-27T00:16:06.9606794Z at > scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) > 2021-02-27T00:16:06.9607642Z at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > 2021-02-27T00:16:06.9608419Z at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > 2021-02-27T00:16:06.9609252Z at > akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) > 2021-02-27T00:16:06.9610024Z at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > 2021-02-27T00:16:06.9613676Z at >
[jira] [Commented] (FLINK-18755) RabbitMQ QoS Chinese Documentation
[ https://issues.apache.org/jira/browse/FLINK-18755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167847#comment-17167847 ] Austin Cawley-Edwards commented on FLINK-18755: --- Thank you so much! I don't have that ability but maybe [~dwysakowicz] or [~aljoscha] might be able to when they have time? > RabbitMQ QoS Chinese Documentation > -- > > Key: FLINK-18755 > URL: https://issues.apache.org/jira/browse/FLINK-18755 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Connectors/ RabbitMQ >Affects Versions: 1.12.0 >Reporter: Austin Cawley-Edwards >Priority: Major > > Please add documentation for the new QoS settings in the RabbitMQ connector. > The added English documentation can be found in the PR here: > [https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167|https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18755) RabbitMQ QoS Chinese Documentation
[ https://issues.apache.org/jira/browse/FLINK-18755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-18755: -- Description: Please add documentation for the new QoS settings in the RabbitMQ connector. The added English documentation can be found in the PR here: [https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167|https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.] was: Please add documentation for the new QoS settings in the RabbitMQ connector. The added English documentation can be found in the PR here: [https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.] > RabbitMQ QoS Chinese Documentation > -- > > Key: FLINK-18755 > URL: https://issues.apache.org/jira/browse/FLINK-18755 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Connectors/ RabbitMQ >Affects Versions: 1.12.0 >Reporter: Austin Cawley-Edwards >Priority: Major > > Please add documentation for the new QoS settings in the RabbitMQ connector. > The added English documentation can be found in the PR here: > [https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167|https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18755) RabbitMQ QoS Chinese Documentation
[ https://issues.apache.org/jira/browse/FLINK-18755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-18755: -- Issue Type: Improvement (was: Bug) > RabbitMQ QoS Chinese Documentation > -- > > Key: FLINK-18755 > URL: https://issues.apache.org/jira/browse/FLINK-18755 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Connectors/ RabbitMQ >Affects Versions: 1.12.0 >Reporter: Austin Cawley-Edwards >Priority: Major > > Please add documentation for the new QoS settings in the RabbitMQ connector. > The added English documentation can be found in the PR here: > [https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18755) RabbitMQ QoS Chinese Documentation
Austin Cawley-Edwards created FLINK-18755: - Summary: RabbitMQ QoS Chinese Documentation Key: FLINK-18755 URL: https://issues.apache.org/jira/browse/FLINK-18755 Project: Flink Issue Type: Bug Components: chinese-translation, Connectors/ RabbitMQ Affects Versions: 1.12.0 Reporter: Austin Cawley-Edwards Please add documentation for the new QoS settings in the RabbitMQ connector. The added English documentation can be found in the PR here: [https://github.com/apache/flink/pull/12729/files#diff-6b432359b51642a8fad3050c4b73f47cR134-R167.] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-17559) Backpressure seems to be broken when not going through network
[ https://issues.apache.org/jira/browse/FLINK-17559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-17559: -- Comment: was deleted (was: I think we can close this as well [~dwysakowicz], if you don't mind.) > Backpressure seems to be broken when not going through network > -- > > Key: FLINK-17559 > URL: https://issues.apache.org/jira/browse/FLINK-17559 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.8.2 >Reporter: Luis >Priority: Major > Attachments: Screenshot from 2020-05-07 13-31-23.png > > > Back pressure for Flink seems broken. Someone please correct me, from what I > understand it only works between network transfers. If I have a source with > no thread sleep then there is no back pressure some operation will accumulate > data and crash. I even tried removing chaining with > env.disableOperatorChaining() > and it works with parallelism set to 1, but with 3 or 4 crashes. See below. > > From this I can conclude if I have any map function that produces more output > that is coming in it will eventually crash if there is no network dividing > them to allow for backpressure. Is this correct? > > > {code:java} > java.lang.OutOfMemoryError: Java heap space > 2020-05-07 18:27:37,942 ERROR > org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread > 'flink-scheduler-1' produced an uncaught exception. Stopping the process... > java.lang.OutOfMemoryError: Java heap space > at akka.dispatch.AbstractNodeQueue.(AbstractNodeQueue.java:32) > at > akka.actor.LightArrayRevolverScheduler$TaskQueue.(LightArrayRevolverScheduler.scala:305) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:270) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > 2020-05-07 18:27:35,725 ERROR > org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread > 'flink-metrics-8' produced an uncaught exception. Stopping the process... > java.lang.OutOfMemoryError: Java heap space > 2020-05-07 18:27:35,725 ERROR > com.rabbitmq.client.impl.ForgivingExceptionHandler- An unexpected > connection driver error occured > java.lang.OutOfMemoryError: Java heap space > at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:120) > at > com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) > at > com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580) > at java.lang.Thread.run(Thread.java:748) > {code} > > [https://stackoverflow.com/questions/61465789/how-do-i-create-a-flink-richparallelsourcefunction-with-backpressure] > > > It seems that I am suppose guess how much my sink can handle and throttle to > that amount in my source generator. But that always puts my system of a risk > of crashing. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17559) Backpressure seems to be broken when not going through network
[ https://issues.apache.org/jira/browse/FLINK-17559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167462#comment-17167462 ] Austin Cawley-Edwards commented on FLINK-17559: --- I think we can close this as well [~dwysakowicz], if you don't mind. > Backpressure seems to be broken when not going through network > -- > > Key: FLINK-17559 > URL: https://issues.apache.org/jira/browse/FLINK-17559 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.8.2 >Reporter: Luis >Priority: Major > Attachments: Screenshot from 2020-05-07 13-31-23.png > > > Back pressure for Flink seems broken. Someone please correct me, from what I > understand it only works between network transfers. If I have a source with > no thread sleep then there is no back pressure some operation will accumulate > data and crash. I even tried removing chaining with > env.disableOperatorChaining() > and it works with parallelism set to 1, but with 3 or 4 crashes. See below. > > From this I can conclude if I have any map function that produces more output > that is coming in it will eventually crash if there is no network dividing > them to allow for backpressure. Is this correct? > > > {code:java} > java.lang.OutOfMemoryError: Java heap space > 2020-05-07 18:27:37,942 ERROR > org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread > 'flink-scheduler-1' produced an uncaught exception. Stopping the process... > java.lang.OutOfMemoryError: Java heap space > at akka.dispatch.AbstractNodeQueue.(AbstractNodeQueue.java:32) > at > akka.actor.LightArrayRevolverScheduler$TaskQueue.(LightArrayRevolverScheduler.scala:305) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:270) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > 2020-05-07 18:27:35,725 ERROR > org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread > 'flink-metrics-8' produced an uncaught exception. Stopping the process... > java.lang.OutOfMemoryError: Java heap space > 2020-05-07 18:27:35,725 ERROR > com.rabbitmq.client.impl.ForgivingExceptionHandler- An unexpected > connection driver error occured > java.lang.OutOfMemoryError: Java heap space > at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:120) > at > com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) > at > com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580) > at java.lang.Thread.run(Thread.java:748) > {code} > > [https://stackoverflow.com/questions/61465789/how-do-i-create-a-flink-richparallelsourcefunction-with-backpressure] > > > It seems that I am suppose guess how much my sink can handle and throttle to > that amount in my source generator. But that always puts my system of a risk > of crashing. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162230#comment-17162230 ] Austin Cawley-Edwards edited comment on FLINK-17529 at 7/21/20, 6:01 PM: - Making this ticket "Upgrade com.rabbitmq:amqp-client to 5.x" as there's a CVE for the 4.2.0 version we're using: [https://github.com/advisories/GHSA-w4g2-9hj6-5472] Should be an easy upgrade after [https://github.com/apache/flink/pull/12729] is merged was (Author: austince): Perhaps we should make this ticket "Upgrade com.rabbitmq:amqp-client to 5.x" as there's a CVE for the 4.2.0 version we're using: [https://github.com/advisories/GHSA-w4g2-9hj6-5472] Should be an easy upgrade after [https://github.com/apache/flink/pull/12729] is merged > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17529) Upgrade com.rabbitmq:amqp-client to latest 5.x
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-17529: -- Summary: Upgrade com.rabbitmq:amqp-client to latest 5.x (was: Upgrade com.rabbitmq:amqp-client to 5.x) > Upgrade com.rabbitmq:amqp-client to latest 5.x > -- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Major > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17529) Upgrade com.rabbitmq:amqp-client to 5.x
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-17529: -- Summary: Upgrade com.rabbitmq:amqp-client to 5.x (was: Replace Deprecated RMQ QueueingConsumer) > Upgrade com.rabbitmq:amqp-client to 5.x > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162230#comment-17162230 ] Austin Cawley-Edwards commented on FLINK-17529: --- Perhaps we should make this ticket "Upgrade com.rabbitmq:amqp-client to 5.x" as there's a CVE for the 4.2.0 version we're using: [https://github.com/advisories/GHSA-w4g2-9hj6-5472] Should be an easy upgrade after [https://github.com/apache/flink/pull/12729] is merged > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17529) Upgrade com.rabbitmq:amqp-client to 5.x
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-17529: -- Priority: Major (was: Minor) > Upgrade com.rabbitmq:amqp-client to 5.x > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Major > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17134241#comment-17134241 ] Austin Cawley-Edwards commented on FLINK-10195: --- [~aljoscha] that would be great, thank you. [~lukaj], I would love feedback from you when I finish a first cut if that sounds good to you. > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125362#comment-17125362 ] Austin Cawley-Edwards edited comment on FLINK-10195 at 6/4/20, 2:50 PM: Yes, that is correct, though I think it is still an improvement on what's here and allows the users to tune their job according to their needs, but I don't think the performance issue that you cite can be fixed with what Rabbit provides. EDIT: we might be able to handle this by updating prefetch counts dynamically if the buffer has space and there are many unacked messages waiting to be acked on checkpoint, though I think that might be too much for an initial implementation. If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and checkpointing interval. This could also be an opt-in/ opt-out change if there are cases that need it disabled, and we should definitely update the docs[1]. Our company still actively uses Rabbit – I'm happy to build off your PR and test it out in our jobs. [1]: [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source] was (Author: austince): Yes, that is correct, though I think it is still an improvement on what's here and allows the users to tune their job according to their needs, but I don't think the performance issue that you cite can be fixed with what Rabbit provides. EDIT: we might be able to handle this by updating prefetch counts dynamically if the buffer has space and there are many unacked messages waiting to be acked on checkpoint, though I think that might be too much for an initial implementation. If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and checkpointing interval. This could also be an opt-in/ opt-out change if there are cases that need it disabled, and we should definitely update the docs. Our company still actively uses Rabbit – I'm happy to build off your PR and test it out in our jobs. [1]: [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source] > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125362#comment-17125362 ] Austin Cawley-Edwards edited comment on FLINK-10195 at 6/3/20, 11:15 PM: - Yes, that is correct, though I think it is still an improvement on what's here and allows the users to tune their job according to their needs, but I don't think the performance issue that you cite can be fixed with what Rabbit provides. EDIT: we might be able to handle this by updating prefetch counts dynamically if the buffer has space and there are many unacked messages waiting to be acked on checkpoint, though I think that might be too much for an initial implementation. If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and checkpointing interval. This could also be an opt-in/ opt-out change if there are cases that need it disabled, and we should definitely update the docs. Our company still actively uses Rabbit – I'm happy to build off your PR and test it out in our jobs. [1]: [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source] was (Author: austince): Yes, that is correct, though I think it is still an improvement on what's here and allows the users to tune their job according to their needs, but I don't think the performance issue that you cite can be fixed with what Rabbit provides. If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and checkpointing interval. This could also be an opt-in/ opt-out change if there are cases that need it disabled, and we should definitely update the docs. Our company still actively uses Rabbit – I'm happy to build off your PR and test it out in our jobs. [1]: [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source] > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227 ] Austin Cawley-Edwards edited comment on FLINK-10195 at 6/3/20, 10:29 PM: - Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer another way, but not looking like it. The only way I've found client-side flow control to work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination with the work done by Luka, we might a decent solution where we could adjust the [channel's prefetch count|https://www.rabbitmq.com/confirms.html] based on the length of the blocking queue, which can be user-configurable. This performs better than turning the consumer off/ on and is very simple to implement. I've put together a small prototype/ playground in Node here ([https://github.com/austince/backpressured-consumer-prototype]), along with some potential issues. The simplest implementation would be a static prefetch count, but if we want to tune that and have it update depending on the space left in the buffer I think that's possible too. If we allow the buffer queue length to be user-configurable, I think it would handle the following tickets as well: {color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}FLINK-6885{color} {color:#172b4d}What do you all think of this proposal?{color} {color:#172b4d}[~aljoscha] {color} {color:#172b4d}[~dwysakowicz] {color} {color:#172b4d}[~senegalo] {color} was (Author: austince): Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer another way, but not looking like it. The only way I've found client-side flow control to work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination with the work done by Luka, we might a decent solution where we could adjust the [channel's prefetch count|#basicQos(int)] based on the length of the blocking queue, which can be user-configurable. This performs better than turning the consumer off/ on and is very simple to implement. I've put together a small prototype/ playground in Node here ([https://github.com/austince/backpressured-consumer-prototype]), along with some potential issues. The simplest implementation would be a static prefetch count, but if we want to tune that and have it update depending on the space left in the buffer I think that's possible too. If we allow the buffer queue length to be user-configurable, I think it would handle the following tickets as well: {color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}FLINK-6885{color} {color:#172b4d}What do you all think of this proposal?{color} {color:#172b4d}[~aljoscha] {color} {color:#172b4d}[~dwysakowicz] {color} {color:#172b4d}[~senegalo] {color} > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125362#comment-17125362 ] Austin Cawley-Edwards commented on FLINK-10195: --- Yes, that is correct, though I think it is still an improvement on what's here and allows the users to tune their job according to their needs, but I don't think the performance issue that you cite can be fixed with what Rabbit provides. If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and checkpointing interval. This could also be an opt-in change if there are cases that need it disabled, and we should definitely update the docs. Our company still actively uses Rabbit -- I'm happy to build off your PR and test it out in our jobs. [1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125362#comment-17125362 ] Austin Cawley-Edwards edited comment on FLINK-10195 at 6/3/20, 10:27 PM: - Yes, that is correct, though I think it is still an improvement on what's here and allows the users to tune their job according to their needs, but I don't think the performance issue that you cite can be fixed with what Rabbit provides. If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and checkpointing interval. This could also be an opt-in/ opt-out change if there are cases that need it disabled, and we should definitely update the docs. Our company still actively uses Rabbit – I'm happy to build off your PR and test it out in our jobs. [1]: [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source] was (Author: austince): Yes, that is correct, though I think it is still an improvement on what's here and allows the users to tune their job according to their needs, but I don't think the performance issue that you cite can be fixed with what Rabbit provides. If the user has high volume queues and needs checkpointing for EXACTLY_ONCE/ AT_LEAST_ONCE guarantees, they'll need to tune their buffer length and checkpointing interval. This could also be an opt-in change if there are cases that need it disabled, and we should definitely update the docs. Our company still actively uses Rabbit -- I'm happy to build off your PR and test it out in our jobs. [1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/rabbitmq.html#rabbitmq-source > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227 ] Austin Cawley-Edwards edited comment on FLINK-10195 at 6/3/20, 5:07 PM: Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer another way, but not looking like it. The only way I've found client-side flow control to work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination with the work done by Luka, we might a decent solution where we could adjust the [channel's prefetch count|#basicQos(int)] based on the length of the blocking queue, which can be user-configurable. This performs better than turning the consumer off/ on and is very simple to implement. I've put together a small prototype/ playground in Node here ([https://github.com/austince/backpressured-consumer-prototype]), along with some potential issues. The simplest implementation would be a static prefetch count, but if we want to tune that and have it update depending on the space left in the buffer I think that's possible too. If we allow the buffer queue length to be user-configurable, I think it would handle the following tickets as well: {color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}FLINK-6885{color} {color:#172b4d}What do you all think of this proposal?{color} {color:#172b4d}[~aljoscha] {color} {color:#172b4d}[~dwysakowicz] {color} {color:#172b4d}[~senegalo] {color} was (Author: austince): Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer another way, but not looking like it. The only way I've found client-side flow control to work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination with the work done by Luka, we might a decent solution where we could adjust the [channel's prefetch count|#basicQos(int)]] based on the length of the blocking queue, which can be user-configurable. This performs better than turning the consumer off/ on and is very simple to implement. I've put together a small prototype/ playground in Node here ([https://github.com/austince/backpressured-consumer-prototype]), along with some potential issues. The simplest implementation would be a static prefetch count, but if we want to tune that and have it update depending on the space left in the buffer I think that's possible too. If we allow the buffer queue length to be user-configurable, I think it would handle the following tickets as well: {color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}FLINK-6885{color} {color:#172b4d}What do you all think of this proposal?{color} {color:#172b4d}[~aljoscha] {color} {color:#172b4d}[~dwysakowicz] {color} {color:#172b4d}[~senegalo] {color} > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227 ] Austin Cawley-Edwards edited comment on FLINK-10195 at 6/1/20, 6:50 PM: Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer another way, but not looking like it. The only way I've found client-side flow control to work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination with the work done by Luka, we might a decent solution where we could adjust the [channel's prefetch count|#basicQos(int)]] based on the length of the blocking queue, which can be user-configurable. This performs better than turning the consumer off/ on and is very simple to implement. I've put together a small prototype/ playground in Node here ([https://github.com/austince/backpressured-consumer-prototype]), along with some potential issues. The simplest implementation would be a static prefetch count, but if we want to tune that and have it update depending on the space left in the buffer I think that's possible too. If we allow the buffer queue length to be user-configurable, I think it would handle the following tickets as well: {color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}FLINK-6885{color} {color:#172b4d}What do you all think of this proposal?{color} {color:#172b4d}[~aljoscha] {color} {color:#172b4d}[~dwysakowicz] {color} {color:#172b4d}[~senegalo] {color} was (Author: austince): Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer another way, but not looking like it. The only way I've found client-side flow control to work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination with the work done by Luka, we might a decent solution where we could adjust the [channel's prefetch count|#basicQos(int)]] based on the length of the blocking queue, which can be user-configurable. This performs better than turning the consumer off/ on and is very simple to implement. I've put together a small prototype/ playground in Node here ([https://github.com/austince/backpressured-consumer-prototype]), along with some potential issues. The simplest implementation would be a static prefetch count, but if we want to tune that and have it update depending on the space left in the buffer I think that's possible too. If we allow the buffer queue length to be user-configurable, I think it would handle the following tickets as well:{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}`FLINK-6885`{color} {color:#172b4d}What do you all think of this proposal?{color} {color:#172b4d}[~aljoscha] {color} {color:#172b4d}[~dwysakowicz] {color} {color:#172b4d}[~senegalo] {color} > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227 ] Austin Cawley-Edwards edited comment on FLINK-10195 at 6/1/20, 6:49 PM: Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer another way, but not looking like it. The only way I've found client-side flow control to work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination with the work done by Luka, we might a decent solution where we could adjust the [channel's prefetch count|#basicQos(int)]] based on the length of the blocking queue, which can be user-configurable. This performs better than turning the consumer off/ on and is very simple to implement. I've put together a small prototype/ playground in Node here ([https://github.com/austince/backpressured-consumer-prototype]), along with some potential issues. The simplest implementation would be a static prefetch count, but if we want to tune that and have it update depending on the space left in the buffer I think that's possible too. If we allow the buffer queue length to be user-configurable, I think it would handle the following tickets as well:{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}`FLINK-6885`{color} {color:#172b4d}What do you all think of this proposal?{color} {color:#172b4d}[~aljoscha] {color} {color:#172b4d}[~dwysakowicz] {color} {color:#172b4d}[~senegalo] {color} was (Author: austince): Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer another way, but not looking like it. The only way I've found client-side flow control to work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination with the work done by Luka, we might a decent solution where we could adjust the [channel's prefetch count|[https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicQos(int)]] based on the length of the blocking queue, which can be user-configurable. This performs better than turning the consumer off/ on and is very simple to implement. I've put together a small prototype/ playground in Node here ([github.com/austince/backpressured-consumer-prototype|[https://github.com/austince/backpressured-consumer-prototype]]), along with some potential issues. The simplest implementation would be a static prefetch count, but if we want to tune that and have it update depending on the space left in the buffer I think that's possible too. If we allow the buffer queue length to be user-configurable, I think it would handle the following tickets as well:{color:#cc7832} {color}{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}`FLINK-6885`{color} {color:#172b4d}What do you all think of this proposal?{color} {color:#172b4d}[~aljoscha] {color} {color:#172b4d}[~dwysakowicz] {color} {color:#172b4d}[~senegalo] {color} > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121227#comment-17121227 ] Austin Cawley-Edwards commented on FLINK-10195: --- Hey all, just bringing this back up as I'm trying to figure out if we could replace the consumer another way, but not looking like it. The only way I've found client-side flow control to work with RabbitMQ is through setting the prefetch count on the channel. I think that in combination with the work done by Luka, we might a decent solution where we could adjust the [channel's prefetch count|[https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#basicQos(int)]] based on the length of the blocking queue, which can be user-configurable. This performs better than turning the consumer off/ on and is very simple to implement. I've put together a small prototype/ playground in Node here ([github.com/austince/backpressured-consumer-prototype|[https://github.com/austince/backpressured-consumer-prototype]]), along with some potential issues. The simplest implementation would be a static prefetch count, but if we want to tune that and have it update depending on the space left in the buffer I think that's possible too. If we allow the buffer queue length to be user-configurable, I think it would handle the following tickets as well:{color:#cc7832} {color}{color:#cc7832}- FLINK-6885 {color}RMQSource does not support QoS, leading to oom {color:#cc7832}- FLINK-17559 {color}Duplicate of {color:#808080}`FLINK-6885`{color} {color:#172b4d}What do you all think of this proposal?{color} {color:#172b4d}[~aljoscha] {color} {color:#172b4d}[~dwysakowicz] {color} {color:#172b4d}[~senegalo] {color} > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Assignee: Luka Jurukovski >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-7267) Add support for lists of hosts to connect
[ https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17108575#comment-17108575 ] Austin Cawley-Edwards edited comment on FLINK-7267 at 5/15/20, 7:05 PM: I put in a PR for option A, as it's less change – if we find that many people are overriding the connection, maybe that would be a good time to add it to the connection config builder? Anyways, can you assign this ticket to me (oops)? Thanks! was (Author: austince): I put in a PR for option A, as it's less change -- if we find that many people are overriding the connection, maybe that would be a good time to add it to the connection config builder? Anyways, can you assign this ticket to me? Thanks! > Add support for lists of hosts to connect > - > > Key: FLINK-7267 > URL: https://issues.apache.org/jira/browse/FLINK-7267 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.3.0 >Reporter: Hu Hailin >Priority: Minor > Labels: pull-request-available > > The RMQConnectionConfig can assign one host:port only. I want to connect to a > cluster with an available node. > My workaround is write my own sink extending RMQSink and override open(), > assigning the nodes list in it. > {code:java} > connection = factory.newConnection(addrs) > {code} > I still need to build the RMQConnectionConfig with a dummy host:port or a > node in list. It's annoying. > I think it is better to provide a configuration for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7267) Add support for lists of hosts to connect
[ https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17108575#comment-17108575 ] Austin Cawley-Edwards commented on FLINK-7267: -- I put in a PR for option A, as it's less change -- if we find that many people are overriding the connection, maybe that would be a good time to add it to the connection config builder? Anyways, can you assign this ticket to me? Thanks! > Add support for lists of hosts to connect > - > > Key: FLINK-7267 > URL: https://issues.apache.org/jira/browse/FLINK-7267 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.3.0 >Reporter: Hu Hailin >Priority: Minor > Labels: pull-request-available > > The RMQConnectionConfig can assign one host:port only. I want to connect to a > cluster with an available node. > My workaround is write my own sink extending RMQSink and override open(), > assigning the nodes list in it. > {code:java} > connection = factory.newConnection(addrs) > {code} > I still need to build the RMQConnectionConfig with a dummy host:port or a > node in list. It's annoying. > I think it is better to provide a configuration for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-7267) Add support for lists of hosts to connect
[ https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104943#comment-17104943 ] Austin Cawley-Edwards edited comment on FLINK-7267 at 5/12/20, 1:25 PM: This seems like a reasonable feature to me. The only way to currently achieve this would be to override the open method for the source and completely re-implement it, which could be messy. I think we could either: A) create an `RMQSource#setupConnection` method, similar to `setupConnectionFactory`. The connection is only assigned in the `open` method, so I don't think it would be easy to introduce accidental errors in many places. This would allow users to make other changes to the connection by just overriding that method. B) wrap the RMQ package's Address class and allow a list of them to be passed as an option to the `RMQConnectionConfig`, which could be used in the `RMQSource#setupConnectionFactory`. I think option A would be best, as it is a minimal change and would open up the possibility of extending the connection in user code (ex: [adding address resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]). [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile? Side-note, should this conversation happen on the dev mailing list? was (Author: austince): This seems like a reasonable feature to me. The only way to currently achieve this would be to override the open method for the source and completely re-implement it, which can be messy. I think we could either: A) create an `RMQSource#setupConnection` method, similar to `setupConnectionFactory`. The connection only assigned in the `open` method, so I don't think it would be easy to introduce accidental errors in many places. This would allow users to make other changes to the connection by just overriding that method. B) wrap the RMQ package's Address class and allow a list of them to be passed as an option to the `RMQConnectionConfig`, which could be used in the `RMQSource#setupConnectionFactory`. I think option A would be best, as it is a minimal change and would open up the possibility of extending the connection in user code (ex: [adding address resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]). [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile? Side-note, should this conversation happen on the dev mailing list? > Add support for lists of hosts to connect > - > > Key: FLINK-7267 > URL: https://issues.apache.org/jira/browse/FLINK-7267 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.3.0 >Reporter: Hu Hailin >Priority: Minor > > The RMQConnectionConfig can assign one host:port only. I want to connect to a > cluster with an available node. > My workaround is write my own sink extending RMQSink and override open(), > assigning the nodes list in it. > {code:java} > connection = factory.newConnection(addrs) > {code} > I still need to build the RMQConnectionConfig with a dummy host:port or a > node in list. It's annoying. > I think it is better to provide a configuration for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-7267) Add support for lists of hosts to connect
[ https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104943#comment-17104943 ] Austin Cawley-Edwards edited comment on FLINK-7267 at 5/11/20, 11:46 PM: - This seems like a reasonable feature to me. The only way to currently achieve this would be to override the open method for the source and completely re-implement it, which can be messy. I think we could either: A) create an `RMQSource#setupConnection` method, similar to `setupConnectionFactory`. The connection only assigned in the `open` method, so I don't think it would be easy to introduce accidental errors in many places. This would allow users to make other changes to the connection by just overriding that method. B) wrap the RMQ package's Address class and allow a list of them to be passed as an option to the `RMQConnectionConfig`, which could be used in the `RMQSource#setupConnectionFactory`. I think option A would be best, as it is a minimal change and would open up the possibility of extending the connection in user code (ex: [adding address resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]). [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile? Side-note, should this conversation happen on the dev mailing list? was (Author: austince): This seems like a reasonable feature to me. The only way to currently achieve this would be to override the open method for the source and completely re-implement it, which can be messy. I think we could either: A) create an `RMQSource#setupConnection` method, similar to `setupConnectionFactory`. The connection only assigned in the `open` method, so I don't think it would be easy to introduce accidental errors in many places. This would allow users to make other changes to the connection by just overriding that method. B) wrap the RMQ package's Address class and add a list of them to be passed as an option to the `RMQConnectionConfig`, which could be used in the `RMQSource#setupConnectionFactory`. I think option A would be best, as it is a minimal change and would open up the possibility of extending the connection in user code (ex: [adding address resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]). [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile? Side-note, should this conversation happen on the dev mailing list? > Add support for lists of hosts to connect > - > > Key: FLINK-7267 > URL: https://issues.apache.org/jira/browse/FLINK-7267 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.3.0 >Reporter: Hu Hailin >Priority: Minor > > The RMQConnectionConfig can assign one host:port only. I want to connect to a > cluster with an available node. > My workaround is write my own sink extending RMQSink and override open(), > assigning the nodes list in it. > {code:java} > connection = factory.newConnection(addrs) > {code} > I still need to build the RMQConnectionConfig with a dummy host:port or a > node in list. It's annoying. > I think it is better to provide a configuration for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-7267) Add support for lists of hosts to connect
[ https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-7267: - Comment: was deleted (was: It might also be worth adding a common base class for both the source and the sink so these common types of overrides can happen for both.) > Add support for lists of hosts to connect > - > > Key: FLINK-7267 > URL: https://issues.apache.org/jira/browse/FLINK-7267 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.3.0 >Reporter: Hu Hailin >Priority: Minor > > The RMQConnectionConfig can assign one host:port only. I want to connect to a > cluster with an available node. > My workaround is write my own sink extending RMQSink and override open(), > assigning the nodes list in it. > {code:java} > connection = factory.newConnection(addrs) > {code} > I still need to build the RMQConnectionConfig with a dummy host:port or a > node in list. It's annoying. > I think it is better to provide a configuration for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7267) Add support for lists of hosts to connect
[ https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104972#comment-17104972 ] Austin Cawley-Edwards commented on FLINK-7267: -- It might also be worth adding a common base class for both the source and the sink so these common types of overrides can happen for both. > Add support for lists of hosts to connect > - > > Key: FLINK-7267 > URL: https://issues.apache.org/jira/browse/FLINK-7267 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.3.0 >Reporter: Hu Hailin >Priority: Minor > > The RMQConnectionConfig can assign one host:port only. I want to connect to a > cluster with an available node. > My workaround is write my own sink extending RMQSink and override open(), > assigning the nodes list in it. > {code:java} > connection = factory.newConnection(addrs) > {code} > I still need to build the RMQConnectionConfig with a dummy host:port or a > node in list. It's annoying. > I think it is better to provide a configuration for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104955#comment-17104955 ] Austin Cawley-Edwards edited comment on FLINK-17529 at 5/11/20, 11:01 PM: -- After more investigation, I don't think we're able to replace this without tackling the issues brought up in FLINK-10195. I think there are other ways of flow-control than described in that ticket though, which might be worth pursuing. Should I close this ticket and move the conversation back to FLINK-10195? Sorry for creating it before diving in! was (Author: austince): After more investigation, I don't think we're able to replace this without tackling the issues brought up in FLINK-10195. I think there are other ways of flow-control than described in that ticket though, which might be worth pursuing. Should I close this ticket and move the conversation back to FLINK-10195? > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104955#comment-17104955 ] Austin Cawley-Edwards commented on FLINK-17529: --- After more investigation, I don't think we're able to replace this without tackling the issues brought up in FLINK-10195. I think there are other ways of flow-control than described in that ticket though, which might be worth pursuing. Should I close this ticket and move the conversation back to FLINK-10195? > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-7267) Add support for lists of hosts to connect
[ https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104943#comment-17104943 ] Austin Cawley-Edwards edited comment on FLINK-7267 at 5/11/20, 10:33 PM: - This seems like a reasonable feature to me. The only way to currently achieve this would be to override the open method for the source and completely re-implement it, which can be messy. I think we could either: A) create an `RMQSource#setupConnection` method, similar to `setupConnectionFactory`. The connection only assigned in the `open` method, so I don't think it would be easy to introduce accidental errors in many places. This would allow users to make other changes to the connection by just overriding that method. B) wrap the RMQ package's Address class and add a list of them to be passed as an option to the `RMQConnectionConfig`, which could be used in the `RMQSource#setupConnectionFactory`. I think option A would be best, as it is a minimal change and would open up the possibility of extending the connection in user code (ex: [adding address resolution|https://www.rabbitmq.com/api-guide.html#service-discovery-with-address-resolver]). [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile? Side-note, should this conversation happen on the dev mailing list? was (Author: austince): This seems like a reasonable feature to me. The only way to currently achieve this would be to override the open method for the source and completely re-implement it, which can be messy. I think we could either: A) create an `RMQSource#setupConnection` method, similar to `setupConnectionFactory`. The connection only assigned in the `open` method, so I don't think it would be easy to introduce accidental errors in many places. This would allow users to make other changes to the connection by just overriding that method. B) wrap the RMQ package's Address class and add a list of them to be passed as an option to the `RMQConnectionConfig`, which could be used in the `RMQSource#setupConnectionFactory`. I think option A would be best, as it is a minimal change and would open up the possibility of extending the connection in user code. [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile? Side-note, should this conversation happen on the dev mailing list? > Add support for lists of hosts to connect > - > > Key: FLINK-7267 > URL: https://issues.apache.org/jira/browse/FLINK-7267 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.3.0 >Reporter: Hu Hailin >Priority: Minor > > The RMQConnectionConfig can assign one host:port only. I want to connect to a > cluster with an available node. > My workaround is write my own sink extending RMQSink and override open(), > assigning the nodes list in it. > {code:java} > connection = factory.newConnection(addrs) > {code} > I still need to build the RMQConnectionConfig with a dummy host:port or a > node in list. It's annoying. > I think it is better to provide a configuration for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7267) Add support for lists of hosts to connect
[ https://issues.apache.org/jira/browse/FLINK-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104943#comment-17104943 ] Austin Cawley-Edwards commented on FLINK-7267: -- This seems like a reasonable feature to me. The only way to currently achieve this would be to override the open method for the source and completely re-implement it, which can be messy. I think we could either: A) create an `RMQSource#setupConnection` method, similar to `setupConnectionFactory`. The connection only assigned in the `open` method, so I don't think it would be easy to introduce accidental errors in many places. This would allow users to make other changes to the connection by just overriding that method. B) wrap the RMQ package's Address class and add a list of them to be passed as an option to the `RMQConnectionConfig`, which could be used in the `RMQSource#setupConnectionFactory`. I think option A would be best, as it is a minimal change and would open up the possibility of extending the connection in user code. [~aljoscha] [~senegalo] - what do you think? Is this change worthwhile? Side-note, should this conversation happen on the dev mailing list? > Add support for lists of hosts to connect > - > > Key: FLINK-7267 > URL: https://issues.apache.org/jira/browse/FLINK-7267 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.3.0 >Reporter: Hu Hailin >Priority: Minor > > The RMQConnectionConfig can assign one host:port only. I want to connect to a > cluster with an available node. > My workaround is write my own sink extending RMQSink and override open(), > assigning the nodes list in it. > {code:java} > connection = factory.newConnection(addrs) > {code} > I still need to build the RMQConnectionConfig with a dummy host:port or a > node in list. It's annoying. > I think it is better to provide a configuration for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17104529#comment-17104529 ] Austin Cawley-Edwards commented on FLINK-17529: --- That would be great, thanks [~aljoscha]. I'm still not sure if this is totally possible without other changes but will investigate. > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100286#comment-17100286 ] Austin Cawley-Edwards edited comment on FLINK-17529 at 5/6/20, 10:23 PM: - Coming from https://issues.apache.org/jira/browse/FLINK-10195 (and the associated PR), just replacing the consumer might be an easier first step than as part of that fix. was (Author: austince): Coming from https://issues.apache.org/jira/browse/FLINK-10195 (and the associated PR), just replacing the consumer might be an easier first step than to do it with that fix. > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17204) The RMQSource and RMQSink of the RabbitMQ connector have inconsistent default value of durable when declaring the queue.
[ https://issues.apache.org/jira/browse/FLINK-17204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101232#comment-17101232 ] Austin Cawley-Edwards commented on FLINK-17204: --- Thank you!! > The RMQSource and RMQSink of the RabbitMQ connector have inconsistent default > value of durable when declaring the queue. > -- > > Key: FLINK-17204 > URL: https://issues.apache.org/jira/browse/FLINK-17204 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: ChaojianZhang >Assignee: Austin Cawley-Edwards >Priority: Major > Labels: pull-request-available > > When the RabbitMQ queue is created and the value of durable is set to true. > When I use the data in the RabbitMQ queue as the source and sink it into > another queue of RabbitMQ after Flink processing, the program reports an > exception, the main exception information is as follows: > > {code:java} > ... > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method(reply-code=406, > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue > 'rabbitmq_connectors_sink' in vhost '/': received 'false' but current is > 'true', class-id=50, method-id=10)Caused by: > com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: > #method(reply-code=406, reply-text=PRECONDITION_FAILED - > inequivalent arg 'durable' for queue 'rabbitmq_connectors_sink' in vhost '/': > received 'false' but current is 'true', class-id=50, method-id=10) at > com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) > at > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443) > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263) at > com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136) ... > 15 more > ...{code} > > I checked the source code of RMQSource and RMQSink and found that the > setupQueue() method of these two classes set the durable value in > queueDeclare() is inconsistent, I think they should be consistent to be > reasonable. > If possible, I want to fix it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-17529: -- Description: The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is removed in 5.x. It should be replaced by a `com.rabbitmq.client.DefaultConsumer`. (was: The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and will be removed in 5.x. It should be replaced by a `com.rabbitmq.client.DefaultConsumer`.) > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and is > removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100286#comment-17100286 ] Austin Cawley-Edwards commented on FLINK-17529: --- Coming from https://issues.apache.org/jira/browse/FLINK-10195 (and the associated PR), just replacing the consumer might be an easier first step than to do it with that fix. > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and > will be removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-17529: -- Priority: Minor (was: Major) > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Minor > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and > will be removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
[ https://issues.apache.org/jira/browse/FLINK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Austin Cawley-Edwards updated FLINK-17529: -- Issue Type: Improvement (was: Bug) > Replace Deprecated RMQ QueueingConsumer > --- > > Key: FLINK-17529 > URL: https://issues.apache.org/jira/browse/FLINK-17529 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: Austin Cawley-Edwards >Priority: Major > > The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking > consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and > will be removed in 5.x. It should be replaced by a > `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17204) The RMQSource and RMQSink of the RabbitMQ connector have inconsistent default value of durable when declaring the queue.
[ https://issues.apache.org/jira/browse/FLINK-17204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100184#comment-17100184 ] Austin Cawley-Edwards commented on FLINK-17204: --- Thanks for assigning [~aljoscha], PR is open. Should I go back to the mailing list thread to look for reviewers or are you able to take it whenever you have time? > The RMQSource and RMQSink of the RabbitMQ connector have inconsistent default > value of durable when declaring the queue. > -- > > Key: FLINK-17204 > URL: https://issues.apache.org/jira/browse/FLINK-17204 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ >Affects Versions: 1.10.0 >Reporter: ChaojianZhang >Assignee: Austin Cawley-Edwards >Priority: Major > Labels: pull-request-available > > When the RabbitMQ queue is created and the value of durable is set to true. > When I use the data in the RabbitMQ queue as the source and sink it into > another queue of RabbitMQ after Flink processing, the program reports an > exception, the main exception information is as follows: > > {code:java} > ... > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method(reply-code=406, > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue > 'rabbitmq_connectors_sink' in vhost '/': received 'false' but current is > 'true', class-id=50, method-id=10)Caused by: > com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: > #method(reply-code=406, reply-text=PRECONDITION_FAILED - > inequivalent arg 'durable' for queue 'rabbitmq_connectors_sink' in vhost '/': > received 'false' but current is 'true', class-id=50, method-id=10) at > com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) > at > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443) > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263) at > com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136) ... > 15 more > ...{code} > > I checked the source code of RMQSource and RMQSink and found that the > setupQueue() method of these two classes set the durable value in > queueDeclare() is inconsistent, I think they should be consistent to be > reasonable. > If possible, I want to fix it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17529) Replace Deprecated RMQ QueueingConsumer
Austin Cawley-Edwards created FLINK-17529: - Summary: Replace Deprecated RMQ QueueingConsumer Key: FLINK-17529 URL: https://issues.apache.org/jira/browse/FLINK-17529 Project: Flink Issue Type: Bug Components: Connectors/ RabbitMQ Affects Versions: 1.10.0 Reporter: Austin Cawley-Edwards The RMQ QueueingConsumer is used in the RMQSource to get a simple blocking consumer. This has been deprecated in `com.rabbitmq:amqp-client` 4.2.0 and will be removed in 5.x. It should be replaced by a `com.rabbitmq.client.DefaultConsumer`. -- This message was sent by Atlassian Jira (v8.3.4#803005)