[jira] [Commented] (FLINK-17874) Writing to hbase throws NPE
[ https://issues.apache.org/jira/browse/FLINK-17874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17113747#comment-17113747 ] chaiyongqiang commented on FLINK-17874: --- When writing a filed to hbase, we serialize ti to bytes. For now, in *_HBaseTypeUtils.serializeFromObject_* only check NULL value for STRING and BYTE, For other type , a NPE is throwed. I think we need a check at the beginning of *_HBaseTypeUtils.serializeFromObject_*. > Writing to hbase throws NPE > --- > > Key: FLINK-17874 > URL: https://issues.apache.org/jira/browse/FLINK-17874 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.10.0 >Reporter: chaiyongqiang >Priority: Major > Attachments: NPE.png > > > Writing a table to hbase throw NPE when the field is NULL, we need handle it . > Please refer to NPE.png for the detail stack. !NPE.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17874) Writing to hbase throws NPE
chaiyongqiang created FLINK-17874: - Summary: Writing to hbase throws NPE Key: FLINK-17874 URL: https://issues.apache.org/jira/browse/FLINK-17874 Project: Flink Issue Type: Bug Components: Connectors / HBase Affects Versions: 1.10.0 Reporter: chaiyongqiang Attachments: NPE.png Writing a table to hbase throw NPE when the field is NULL, we need handle it . Please refer to NPE.png for the detail stack. !NPE.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16626) Exception encountered when cancelling a job in yarn per-job mode
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061326#comment-17061326 ] chaiyongqiang commented on FLINK-16626: --- Maybe you'ra right. As far as i see, *flink stop* is going to be removed .Only *flink cancel* comes into this logic. Also there could be some other situation i haven't noticed. > Exception encountered when cancelling a job in yarn per-job mode > > > Key: FLINK-16626 > URL: https://issues.apache.org/jira/browse/FLINK-16626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: chaiyongqiang >Priority: Major > > When executing the following command to stop a flink job with yarn per-job > mode, the client keeps retrying untill timeout (1 minutes)and exit with > failure. But the job stops successfully. > Command : > {noformat} > flink cancel $jobId yid appId > {noformat} > The exception on the client side is : > {quote}bq. > 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > ... > 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Shutting down rest endpoint. > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,077 DEBUG > org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 > thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 > 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest > endpoint shutdown complete. > 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error > while running the command. > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) > ... 9 more > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliF
[jira] [Commented] (FLINK-16637) Flink per job mode terminates before serving job cancellation result
[ https://issues.apache.org/jira/browse/FLINK-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061323#comment-17061323 ] chaiyongqiang commented on FLINK-16637: --- Hi, [~tison], i see this problem too. I created another issue [FLINK-16626 |https://issues.apache.org/jira/browse/FLINK-16626]and give some explantiation for this issue. Do you have a plan to fix this? > Flink per job mode terminates before serving job cancellation result > > > Key: FLINK-16637 > URL: https://issues.apache.org/jira/browse/FLINK-16637 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: Zili Chen >Priority: Major > Fix For: 1.10.1, 1.11.0 > > Attachments: yarn.log > > > The {{MiniDispatcher}} no longer waits until the REST handler has served the > cancellation result before shutting down. This behaviour seems to be > introduced with FLINK-15116. > See also > [https://lists.apache.org/x/thread.html/rcadbd6ceede422bac8d4483fd0cdae58659fbff78533a399eb136743@%3Cdev.flink.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16626) Exception encountered when cancelling a job in yarn per-job mode
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaiyongqiang updated FLINK-16626: -- Summary: Exception encountered when cancelling a job in yarn per-job mode (was: Exception ncountered when cancelling a job in yarn per-job mode) > Exception encountered when cancelling a job in yarn per-job mode > > > Key: FLINK-16626 > URL: https://issues.apache.org/jira/browse/FLINK-16626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: chaiyongqiang >Priority: Major > > When executing the following command to stop a flink job with yarn per-job > mode, the client keeps retrying untill timeout (1 minutes)and exit with > failure. But the job stops successfully. > Command : > {noformat} > flink cancel $jobId yid appId > {noformat} > The exception on the client side is : > {quote}bq. > 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > ... > 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Shutting down rest endpoint. > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,077 DEBUG > org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 > thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 > 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest > endpoint shutdown complete. > 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error > while running the command. > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) > ... 9 more > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) > ... 9 more > {quote} > Actually, the j
[jira] [Updated] (FLINK-16626) Exception ncountered when cancelling a job in yarn per-job mode
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaiyongqiang updated FLINK-16626: -- Summary: Exception ncountered when cancelling a job in yarn per-job mode (was: Exception Encountered when cancelling a job in yarn per-job mode) > Exception ncountered when cancelling a job in yarn per-job mode > --- > > Key: FLINK-16626 > URL: https://issues.apache.org/jira/browse/FLINK-16626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: chaiyongqiang >Priority: Major > > When executing the following command to stop a flink job with yarn per-job > mode, the client keeps retrying untill timeout (1 minutes)and exit with > failure. But the job stops successfully. > Command : > {noformat} > flink cancel $jobId yid appId > {noformat} > The exception on the client side is : > {quote}bq. > 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > ... > 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Shutting down rest endpoint. > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,077 DEBUG > org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 > thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 > 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest > endpoint shutdown complete. > 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error > while running the command. > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) > ... 9 more > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) > ... 9 more > {quote} > Actually, the job
[jira] [Comment Edited] (FLINK-16626) Exception Encountered when cancelling a job in yarn per-job mode
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17060639#comment-17060639 ] chaiyongqiang edited comment on FLINK-16626 at 3/17/20, 5:12 AM: - In my opinion, when the client call an cancel command, the server will run into the following logic : # call the JobCancellationHandler to handle the request # response to the client Codes as follows: {code:java} // @Override protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest handlerRequest, T gateway) { CompletableFuture response; try { response = handleRequest(handlerRequest, gateway); } catch (RestHandlerException e) { response = FutureUtils.completedExceptionally(e); } return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders)); } {code} But in the handleRequest function of `JobCancellationHandler`, the cluster will stop and call to shutdown the RestServerEndpoint in which the EventLoopGroup also will be shutdown. Then when calling `HandlerUtils.sendResponse` to response to the client, exception occurs. was (Author: chaiyq): In my opinion, when the client call an cancel command, the server will run into the following logic : # call the JobCancellationHandler to handle the request # response to the client Codes as follows: {quote}@Override protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest handlerRequest, T gateway) { CompletableFuture response; try { response = handleRequest(handlerRequest, gateway); } catch (RestHandlerException e) { response = FutureUtils.completedExceptionally(e); } return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders)); }{quote} But in the handleRequest function of `JobCancellationHandler`, the cluster will stop and call to shutdown the RestServerEndpoint in which the EventLoopGroup also will be shutdown. Then when calling `HandlerUtils.sendResponse` to response to the client, exception occurs. > Exception Encountered when cancelling a job in yarn per-job mode > > > Key: FLINK-16626 > URL: https://issues.apache.org/jira/browse/FLINK-16626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: chaiyongqiang >Priority: Major > > When executing the following command to stop a flink job with yarn per-job > mode, the client keeps retrying untill timeout (1 minutes)and exit with > failure. But the job stops successfully. > Command : > {noformat} > flink cancel $jobId yid appId > {noformat} > The exception on the client side is : > {quote}bq. > 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > ... > 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Shutting down rest endpoint. > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,077 DEBUG > org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 > thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 > 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest > endpoint shutdown complete. > 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error > while running the command. > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.jav
[jira] [Commented] (FLINK-16626) Exception Encountered when cancelling a job in yarn per-job mode
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17060639#comment-17060639 ] chaiyongqiang commented on FLINK-16626: --- In my opinion, when the client call an cancel command, the server will run into the following logic : # call the JobCancellationHandler to handle the request # response to the client Codes as follows: {quote}@Override protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest handlerRequest, T gateway) { CompletableFuture response; try { response = handleRequest(handlerRequest, gateway); } catch (RestHandlerException e) { response = FutureUtils.completedExceptionally(e); } return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders)); }{quote} But in the handleRequest function of `JobCancellationHandler`, the cluster will stop and call to shutdown the RestServerEndpoint in which the EventLoopGroup also will be shutdown. Then when calling `HandlerUtils.sendResponse` to response to the client, exception occurs. > Exception Encountered when cancelling a job in yarn per-job mode > > > Key: FLINK-16626 > URL: https://issues.apache.org/jira/browse/FLINK-16626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: chaiyongqiang >Priority: Major > > When executing the following command to stop a flink job with yarn per-job > mode, the client keeps retrying untill timeout (1 minutes)and exit with > failure. But the job stops successfully. > Command : > {noformat} > flink cancel $jobId yid appId > {noformat} > The exception on the client side is : > {quote}bq. > 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > ... > 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Shutting down rest endpoint. > 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - > Sending request of class class > org.apache.flink.runtime.rest.messages.EmptyRequestBody to > ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel > 2020-03-17 12:33:14,077 DEBUG > org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 > thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 > 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest > endpoint shutdown complete. > 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error > while running the command. > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) > ... 9 more > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not cancel job > cc61033484d4c0e7a27a8a2a36f4de7a. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) > at > org.apache.fli
[jira] [Updated] (FLINK-16626) Exception Encountered when cancelling a job in yarn per-job mode
[ https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaiyongqiang updated FLINK-16626: -- Description: When executing the following command to stop a flink job with yarn per-job mode, the client keeps retrying untill timeout (1 minutes)and exit with failure. But the job stops successfully. Command : {noformat} flink cancel $jobId yid appId {noformat} The exception on the client side is : {quote}bq. 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel ... 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint. 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel 2020-03-17 12:33:14,077 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 thread-local buffer(s) from thread: flink-rest-client-netty-thread-1 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete. 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command. org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a. at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) ... 9 more The program finished with the following exception: org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a. at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543) ... 9 more {quote} Actually, the job was cancelled. But the server also prints some exception: {quote}2020-03-17 12:25:13,754 ERROR [flink-akka.actor.default-dispatcher-17] org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:766) - Failed to submit a listener notification task. Event loop shut down? java.util.concurrent.RejectedExecutionException: event executor terminated at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:855) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:340) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExec
[jira] [Created] (FLINK-16626) Exception Encountered when cancelling a job in yarn per-job mode
chaiyongqiang created FLINK-16626: - Summary: Exception Encountered when cancelling a job in yarn per-job mode Key: FLINK-16626 URL: https://issues.apache.org/jira/browse/FLINK-16626 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.10.0 Reporter: chaiyongqiang When executing the following command to stop a flink job with yarn per-job mode, the client keeps retrying untill timeout (1 minutes)and exit with failure. But the job stops successfully. Command : {noformat} flink cancel $jobId yid appId {noformat} The exception on the client side is : {quote}bq. 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel 2020-03-17 12:32:13,749 DEBUG org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector - -Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.level: simple 2020-03-17 12:32:13,749 DEBUG org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector - -Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.targetRecords: 4 2020-03-17 12:32:13,759 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true 2020-03-17 12:32:13,759 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true 2020-03-17 12:32:13,760 DEBUG org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@7cb68452 2020-03-17 12:32:13,786 DEBUG org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId - -Dio.netty.processId: 156764 (auto-detected) 2020-03-17 12:32:13,788 DEBUG org.apache.flink.shaded.netty4.io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false 2020-03-17 12:32:13,788 DEBUG org.apache.flink.shaded.netty4.io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false 2020-03-17 12:32:13,791 DEBUG org.apache.flink.shaded.netty4.io.netty.util.NetUtil - Loopback interface: lo (lo, 0:0:0:0:0:0:0:1%lo) 2020-03-17 12:32:13,791 DEBUG org.apache.flink.shaded.netty4.io.netty.util.NetUtil - /proc/sys/net/core/somaxconn: 128 2020-03-17 12:32:13,793 DEBUG org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 02:42:dd:ff:fe:50:e5:91 (auto-detected) 2020-03-17 12:32:13,820 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16 2020-03-17 12:32:13,821 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16 2020-03-17 12:32:13,821 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192 2020-03-17 12:32:13,821 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11 2020-03-17 12:32:13,821 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216 2020-03-17 12:32:13,821 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512 2020-03-17 12:32:13,821 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256 2020-03-17 12:32:13,821 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64 2020-03-17 12:32:13,821 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768 2020-03-17 12:32:13,822 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192 2020-03-17 12:32:13,822 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0 2020-03-17 12:32:13,822 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true 2020-03-17 12:32:13,822 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023 2020-03-17 12:32:13,830 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled 2020-03-17 12:32:13,830 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0 2020-03-17 12:32:13,830 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
[jira] [Commented] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API
[ https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995396#comment-16995396 ] chaiyongqiang commented on FLINK-14719: --- [~jark], thanks. I'll handle this myself on flink1.8 branch and will create a pr for developing branch. Wish i could catch flink1.10 which will be released next month. Please close this issue. > Making Semantic configurable in Flinkkafkaproducer to support exactly-once > semantic in Table API > - > > Key: FLINK-14719 > URL: https://issues.apache.org/jira/browse/FLINK-14719 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.8.0 >Reporter: chaiyongqiang >Priority: Major > > Flink supports kafka transaction with FlinkKafkaProducer and > FlinkKafkaProducer011 . When we use Datastream API , it's able to realize > exactly once semantic . But when we use Table API, things are different. > The createKafkaProducer method in KafkaTableSink is used to create > FlinkKafkaProducer to sending messages to Kafka server. It's like : > {code:java} > protected SinkFunction createKafkaProducer( > String topic, > Properties properties, > SerializationSchema serializationSchema, > Optional> partitioner) { > return new FlinkKafkaProducer<>( > topic, > new > KeyedSerializationSchemaWrapper<>(serializationSchema), > properties, > partitioner); > } > {code} > when we get into the constructor of FlinkKafkaProducer we can see this will > lead to an at_least_once semantic producer : > {code:java} > public FlinkKafkaProducer( > String defaultTopicId, > KeyedSerializationSchema serializationSchema, > Properties producerConfig, > Optional> customPartitioner) { > this( > defaultTopicId, > serializationSchema, > producerConfig, > customPartitioner, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, > DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); > } > {code} > This makes user could not achieve exactly-once semantic when using Table API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15221) supporting Exactly-once for kafka table APi
[ https://issues.apache.org/jira/browse/FLINK-15221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaiyongqiang updated FLINK-15221: -- Summary: supporting Exactly-once for kafka table APi (was: supporting Exactly-once for table APi) > supporting Exactly-once for kafka table APi > --- > > Key: FLINK-15221 > URL: https://issues.apache.org/jira/browse/FLINK-15221 > Project: Flink > Issue Type: Wish > Components: Table SQL / API >Reporter: chaiyongqiang >Priority: Major > > The Table Api doesn't support End to End Exactly once sematic like datastream > Api. Does Flink have a plan for this? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15221) supporting Exactly-once for table APi
[ https://issues.apache.org/jira/browse/FLINK-15221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995247#comment-16995247 ] chaiyongqiang commented on FLINK-15221: --- [~jark] thanks . You're right . But i've checked the kafkatablesink. I don't think an overwrite of 'KafkaTableSink' and 'KafkaTableSourceSinkFactory' is a perfect way to gain the end-to-end exactly-once semantic when a user develops an app . It's a too complex for a flink app developer and it makes him focus too much on the details of flink. I think we'd better expose a semantic configuration for app users to set when they want exectly-once semantic. You can have a look at the following issue i opened. How do you think about it? [FLINK-14719|https://issues.apache.org/jira/projects/FLINK/issues/FLINK-14719?filter=allopenissues] > supporting Exactly-once for table APi > - > > Key: FLINK-15221 > URL: https://issues.apache.org/jira/browse/FLINK-15221 > Project: Flink > Issue Type: Wish > Components: Table SQL / API >Reporter: chaiyongqiang >Priority: Major > > The Table Api doesn't support End to End Exactly once sematic like datastream > Api. Does Flink have a plan for this? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15221) supporting Exactly-once for table APi
chaiyongqiang created FLINK-15221: - Summary: supporting Exactly-once for table APi Key: FLINK-15221 URL: https://issues.apache.org/jira/browse/FLINK-15221 Project: Flink Issue Type: Wish Components: Table SQL / API Affects Versions: 1.9.0 Reporter: chaiyongqiang The Table Api doesn't support End to End Exactly once sematic like datastream Api. Does Flink have a plan for this? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14871) Better formatter of toString method for StateTransition
chaiyongqiang created FLINK-14871: - Summary: Better formatter of toString method for StateTransition Key: FLINK-14871 URL: https://issues.apache.org/jira/browse/FLINK-14871 Project: Flink Issue Type: Improvement Components: Library / CEP Affects Versions: 1.9.1 Reporter: chaiyongqiang Attachments: 屏幕快照 2019-11-20 下午3.27.43.png The toString method in StateTransition does not have a good format. {code:java} @Override public String toString() { return new StringBuilder() .append("StateTransition(") .append(action).append(", ") .append("from ").append(sourceState.getName()) .append("to ").append(targetState.getName()) .append(condition != null ? ", with condition)" : ")") .toString(); } {code} The problem is there's no separator between `sourceState.getName()` and "to " which leads to a bad format like the attachment showing. A blank space is more friendly. {code:java} @Override public String toString() { return new StringBuilder() .append("StateTransition(") .append(action).append(", ") .append("from ").append(sourceState.getName()) .append(" to ").append(targetState.getName()) .append(condition != null ? ", with condition)" : ")") .toString(); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-13052) Supporting multi-topic when using kafkaTableSourceSinkFactoryBase.createStreamTableSource
[ https://issues.apache.org/jira/browse/FLINK-13052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972243#comment-16972243 ] chaiyongqiang edited comment on FLINK-13052 at 11/12/19 10:08 AM: -- There're some limits when we support multi source topics in kafkaTableSourceSinkFactoryBase. It's like : When we assign the startup mode as specific-offsets, it's difficult to assign the start-offset for each topic partition because of the way to set the connector configs. As the following codes show, when we assign several topics like MyTable, MyTable1,MyTable2 , it's difficult to distinguish the partition and offset belong to wich topic. {code:java} final Map props2 = new HashMap<>(); props2.put("connector.property-version", "1"); props2.put("connector.type", "kafka"); props2.put("connector.version", "0.11"); props2.put("connector.topic", "MyTable"); props2.put("connector.startup-mode", "specific-offsets"); props2.put("connector.specific-offsets.0.partition", "0"); props2.put("connector.specific-offsets.0.offset", "42"); props2.put("connector.specific-offsets.1.partition", "1"); {code} But fortunately, setting a fixed topic-partition start offset when we start an app with table API is rare. My solution is supporting multi-topic for other kind of start-up mode like GROUP_OFFSETS, EARLIEST,LATEST, for SPECIFIC_OFFSETS mode when using Table API like kafkaTableSourceSinkFactoryBase with several topics we use the GROUP_OFFSETS instead. Does someone have a better idea ? Please let me know. Many thanks was (Author: chaiyq): There're some limits when we support multi source topics in kafkaTableSourceSinkFactoryBase. It's like : When we assign the startup mode as specific-offsets, it's difficult to assign the start-offset for each topic partition because of the way to set the connector configs. As the following codes show, when we assign several topics like MyTable, MyTable1,MyTable2 , it's difficult to distinguish the partition and offset belong to wich topic. {code:java} final Map props2 = new HashMap<>(); props2.put("connector.property-version", "1"); props2.put("connector.type", "kafka"); props2.put("connector.version", "0.11"); props2.put("connector.topic", "MyTable"); props2.put("connector.startup-mode", "specific-offsets"); props2.put("connector.specific-offsets.0.partition", "0"); props2.put("connector.specific-offsets.0.offset", "42"); props2.put("connector.specific-offsets.1.partition", "1"); {code} But fortunately, setting a fixed topic-partition start offset when we start an app with table API is rare. For now, i suggest ignoring this situation. My solution is supporting multi-topic for other kind of start-up mode like GROUP_OFFSETS, EARLIEST,LATEST, for SPECIFIC_OFFSETS mode when using Table API like kafkaTableSourceSinkFactoryBase with several topics we use the GROUP_OFFSETS instead. > Supporting multi-topic when using > kafkaTableSourceSinkFactoryBase.createStreamTableSource > - > > Key: FLINK-13052 > URL: https://issues.apache.org/jira/browse/FLINK-13052 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / API >Affects Versions: 1.8.0 >Reporter: chaiyongqiang >Priority: Critical > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13052) Supporting multi-topic when using kafkaTableSourceSinkFactoryBase.createStreamTableSource
[ https://issues.apache.org/jira/browse/FLINK-13052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972243#comment-16972243 ] chaiyongqiang commented on FLINK-13052: --- There're some limits when we support multi source topics in kafkaTableSourceSinkFactoryBase. It's like : When we assign the startup mode as specific-offsets, it's difficult to assign the start-offset for each topic partition because of the way to set the connector configs. As the following codes show, when we assign several topics like MyTable, MyTable1,MyTable2 , it's difficult to distinguish the partition and offset belong to wich topic. {code:java} final Map props2 = new HashMap<>(); props2.put("connector.property-version", "1"); props2.put("connector.type", "kafka"); props2.put("connector.version", "0.11"); props2.put("connector.topic", "MyTable"); props2.put("connector.startup-mode", "specific-offsets"); props2.put("connector.specific-offsets.0.partition", "0"); props2.put("connector.specific-offsets.0.offset", "42"); props2.put("connector.specific-offsets.1.partition", "1"); {code} But fortunately, setting a fixed topic-partition start offset when we start an app with table API is rare. For now, i suggest ignoring this situation. My solution is supporting multi-topic for other kind of start-up mode like GROUP_OFFSETS, EARLIEST,LATEST, for SPECIFIC_OFFSETS mode when using Table API like kafkaTableSourceSinkFactoryBase with several topics we use the GROUP_OFFSETS instead. > Supporting multi-topic when using > kafkaTableSourceSinkFactoryBase.createStreamTableSource > - > > Key: FLINK-13052 > URL: https://issues.apache.org/jira/browse/FLINK-13052 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / API >Affects Versions: 1.8.0 >Reporter: chaiyongqiang >Priority: Critical > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API
[ https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972191#comment-16972191 ] chaiyongqiang edited comment on FLINK-14719 at 11/12/19 8:55 AM: - When i check the code in branch Flink 1.9 and the master, the constructor in FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we could modify the createKafkaProducer method in KafkaTableSinkBase and all the classes which extend KafkaTableSinkBase to support exactly-once Semantic API in Flink. +I could open a new issue to tracking this.+ But for branch flink 1.8 , a light weight method would help. We could achieve the semantic config and set it in the constructor in the following way. {code:java} /** * Configuration key for disabling the metrics reporting. */ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; /** * Configuration key for setting the producer semantic. */ public static final String KEY_SEMANTIC = "flink.semantic"; public FlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner) { this( defaultTopicId, serializationSchema, producerConfig, customPartitioner, Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, Semantic.AT_LEAST_ONCE.name()).toUpperCase()), DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } {code} Could someone offer me some advice ? Many thans. was (Author: chaiyq): when i check the code in branch Flink 1.9 and the master, the constructor in FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we could modify the createKafkaProducer method in KafkaTableSinkBase and all the classes which extend KafkaTableSinkBase to support exactly-once Semantic API in Flink. +I could open a new issue to tracking this.+ But for branch flink 1.8 , a light weight method would help. We could achieve the semantic config and set it in the constructor in the following way. {code:java} /** * Configuration key for disabling the metrics reporting. */ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; /** * Configuration key for setting the producer semantic. */ public static final String KEY_SEMANTIC = "flink.semantic"; public FlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner) { this( defaultTopicId, serializationSchema, producerConfig, customPartitioner, Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, Semantic.AT_LEAST_ONCE.name()).toUpperCase()), DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } {code} Could someone offer me some advice ? Many thans. > Making Semantic configurable in Flinkkafkaproducer to support exactly-once > semantic in Table API > - > > Key: FLINK-14719 > URL: https://issues.apache.org/jira/browse/FLINK-14719 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.8.0 >Reporter: chaiyongqiang >Priority: Major > > Flink supports kafka transaction with FlinkKafkaProducer and > FlinkKafkaProducer011 . When we use Datastream API , it's able to realize > exactly once semantic . But when we use Table API, things are different. > The createKafkaProducer method in KafkaTableSink is used to create > FlinkKafkaProducer to sending messages to Kafka server. It's like : > {code:java} > protected SinkFunction createKafkaProducer( > String topic, > Properties properties, > SerializationSchema serializationSchema, > Optional> partitioner) { > return new FlinkKafkaProducer<>( > topic, > new > KeyedSerializationSchemaWrapper<>(serializationSchema), > properties, > partitioner); > } > {code} > when we get into the constructor of FlinkKafkaProducer we can see this will > lead to an at_least_once semantic producer : > {code:java} > public FlinkKafkaProducer( > String defaultTopicId, > KeyedSerializationSchema serializationSchema, > Properties producerConfig, > Optional> customPartitioner) { >
[jira] [Commented] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API
[ https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972191#comment-16972191 ] chaiyongqiang commented on FLINK-14719: --- when i check the code in branch Flink 1.9 and the master, the constructor in FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we could modify the createKafkaProducer method in KafkaTableSinkBase and all the classes which extend KafkaTableSinkBase to support exactly-once Semantic API in Flink. +I could open a new issue to tracking this.+ But for branch flink 1.8 , a light weight method would help. We could achieve the semantic config and set it in the constructor in the following way. {code:java} /** * Configuration key for disabling the metrics reporting. */ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; /** * Configuration key for setting the producer semantic. */ public static final String KEY_SEMANTIC = "flink.semantic"; public FlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner) { this( defaultTopicId, serializationSchema, producerConfig, customPartitioner, Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, Semantic.AT_LEAST_ONCE.name()).toUpperCase()), DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } {code} Could someone offer me some advice ? Many thans. > Making Semantic configurable in Flinkkafkaproducer to support exactly-once > semantic in Table API > - > > Key: FLINK-14719 > URL: https://issues.apache.org/jira/browse/FLINK-14719 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.8.0 >Reporter: chaiyongqiang >Priority: Major > > Flink supports kafka transaction with FlinkKafkaProducer and > FlinkKafkaProducer011 . When we use Datastream API , it's able to realize > exactly once semantic . But when we use Table API, things are different. > The createKafkaProducer method in KafkaTableSink is used to create > FlinkKafkaProducer to sending messages to Kafka server. It's like : > {code:java} > protected SinkFunction createKafkaProducer( > String topic, > Properties properties, > SerializationSchema serializationSchema, > Optional> partitioner) { > return new FlinkKafkaProducer<>( > topic, > new > KeyedSerializationSchemaWrapper<>(serializationSchema), > properties, > partitioner); > } > {code} > when we get into the constructor of FlinkKafkaProducer we can see this will > lead to an at_least_once semantic producer : > {code:java} > public FlinkKafkaProducer( > String defaultTopicId, > KeyedSerializationSchema serializationSchema, > Properties producerConfig, > Optional> customPartitioner) { > this( > defaultTopicId, > serializationSchema, > producerConfig, > customPartitioner, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, > DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); > } > {code} > This makes user could not achieve exactly-once semantic when using Table API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API
chaiyongqiang created FLINK-14719: - Summary: Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API Key: FLINK-14719 URL: https://issues.apache.org/jira/browse/FLINK-14719 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.8.0 Reporter: chaiyongqiang Flink supports kafka transaction with FlinkKafkaProducer and FlinkKafkaProducer011 . When we use Datastream API , it's able to realize exactly once semantic . But when we use Table API, things are different. The createKafkaProducer method in KafkaTableSink is used to create FlinkKafkaProducer to sending messages to Kafka server. It's like : {code:java} protected SinkFunction createKafkaProducer( String topic, Properties properties, SerializationSchema serializationSchema, Optional> partitioner) { return new FlinkKafkaProducer<>( topic, new KeyedSerializationSchemaWrapper<>(serializationSchema), properties, partitioner); } {code} when we get into the constructor of FlinkKafkaProducer we can see this will lead to an at_least_once semantic producer : {code:java} public FlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner) { this( defaultTopicId, serializationSchema, producerConfig, customPartitioner, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } {code} This makes user could not achieve exactly-once semantic when using Table API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12979) Sending data to kafka with CsvRowSerializationSchema always adding a "\n", "\r","\r\n" at the end of the message
[ https://issues.apache.org/jira/browse/FLINK-12979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16915176#comment-16915176 ] chaiyongqiang commented on FLINK-12979: --- #9529 > Sending data to kafka with CsvRowSerializationSchema always adding a "\n", > "\r","\r\n" at the end of the message > > > Key: FLINK-12979 > URL: https://issues.apache.org/jira/browse/FLINK-12979 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0, 2.0.0 >Reporter: chaiyongqiang >Assignee: Hugo Louro >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > When sending data to kafka using CsvRowSerializationSchema, the > CsvRowSerializationSchema#serialize method helps generating value for > KafkaRecord, which will call CsvEncoder#endRow and in which a > _cfgLineSeparator will be added at the end of KafkaRecord.value. > But For CsvRowSerializationSchema#Builder , when you calling the mothod > setLineDelimiter only "\n","\r","\r\n" could be used as the parameter. > It's not friendly when you want to send a message "123,pingpong,21:00" to > kafka but kafka receives a message "123,pingpong,21:00\r\n". > I'm not sure about the reason for limitting the lineDelimiter to > "\n","\r","\r\n" ? In previous version and jackson-databind, there's no > limits on lineDelimiter. > But at least it should let the application developer to set LineDelimiter > with "". -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13052) Supporting multi-topic when using kafkaTableSourceSinkFactoryBase.createStreamTableSource
chaiyongqiang created FLINK-13052: - Summary: Supporting multi-topic when using kafkaTableSourceSinkFactoryBase.createStreamTableSource Key: FLINK-13052 URL: https://issues.apache.org/jira/browse/FLINK-13052 Project: Flink Issue Type: Improvement Affects Versions: 1.8.0 Reporter: chaiyongqiang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12979) Sending data to kafka with CsvRowSerializationSchema always adding a "\n", "\r","\r\n" at the end of the message
[ https://issues.apache.org/jira/browse/FLINK-12979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876735#comment-16876735 ] chaiyongqiang commented on FLINK-12979: --- When i check com.fasterxml.jackson.dataformat.csv.CsvSchema, there's no limit for setLineSeparator. The default is '\n', So, For flink, we need set the LineSeparator to ''. Even we set a custom property, we also need set the LineSeparator to '' when we handle the custom property. So just make '' as an available charactor is a simple but effective way to solve this. Just like following check. we could allow user set an ''. It has the same result. if (!delimiter.equals("\n") && !delimiter.equals("\r") && !delimiter.equals("\r\n")) { throw new IllegalArgumentException( "Unsupported new line delimiter. Only \\n, \\r, or \\r\\n are supported."); } Do you think the way of opening the limit of setting '' is ok ? > Sending data to kafka with CsvRowSerializationSchema always adding a "\n", > "\r","\r\n" at the end of the message > > > Key: FLINK-12979 > URL: https://issues.apache.org/jira/browse/FLINK-12979 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0, 2.0.0 >Reporter: chaiyongqiang >Assignee: Hugo Louro >Priority: Major > > When sending data to kafka using CsvRowSerializationSchema, the > CsvRowSerializationSchema#serialize method helps generating value for > KafkaRecord, which will call CsvEncoder#endRow and in which a > _cfgLineSeparator will be added at the end of KafkaRecord.value. > But For CsvRowSerializationSchema#Builder , when you calling the mothod > setLineDelimiter only "\n","\r","\r\n" could be used as the parameter. > It's not friendly when you want to send a message "123,pingpong,21:00" to > kafka but kafka receives a message "123,pingpong,21:00\r\n". > I'm not sure about the reason for limitting the lineDelimiter to > "\n","\r","\r\n" ? In previous version and jackson-databind, there's no > limits on lineDelimiter. > But at least it should let the application developer to set LineDelimiter > with "". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12979) Sending data to kafka with CsvRowSerializationSchema always adding a "\n", "\r","\r\n" at the end of the message
chaiyongqiang created FLINK-12979: - Summary: Sending data to kafka with CsvRowSerializationSchema always adding a "\n", "\r","\r\n" at the end of the message Key: FLINK-12979 URL: https://issues.apache.org/jira/browse/FLINK-12979 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0, 2.0.0 Reporter: chaiyongqiang When sending data to kafka using CsvRowSerializationSchema, the CsvRowSerializationSchema#serialize method helps generating value for KafkaRecord, which will call CsvEncoder#endRow and in which a _cfgLineSeparator will be added at the end of KafkaRecord.value. But For CsvRowSerializationSchema#Builder , when you calling the mothod setLineDelimiter only "\n","\r","\r\n" could be used as the parameter. It's not friendly when you want to send a message "123,pingpong,21:00" to kafka but kafka receives a message "123,pingpong,21:00\r\n". I'm not sure about the reason for limitting the lineDelimiter to "\n","\r","\r\n" ? In previous version and jackson-databind, there's no limits on lineDelimiter. But at least it should let the application developer to set LineDelimiter with "". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12622) The process function compiles error in MyProcessWindowFunction in windows.md
[ https://issues.apache.org/jira/browse/FLINK-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848003#comment-16848003 ] chaiyongqiang commented on FLINK-12622: --- [https://github.com/apache/flink/pull/8540/commits] > The process function compiles error in MyProcessWindowFunction in windows.md > > > Key: FLINK-12622 > URL: https://issues.apache.org/jira/browse/FLINK-12622 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.8.0 >Reporter: chaiyongqiang >Priority: Minor > > The process function defined as below in windows.md : > {quote}class MyProcessWindowFunction extends ProcessWindowFunction[(String, > Long), String, String, TimeWindow] { > def process(key: String, context: Context, input: Iterable[(String, Long)], > out: Collector[String]): () = { > var count = 0L > for (in <- input) { > count = count + 1 > } > out.collect(s"Window ${context.window} count: $count") > } > }{quote} > The process function defined in ProcessWindowFunction has a return vlue of > Unit , But the override in MyProcessWindowFunction doesn't match it well. > When compiling MyProcessWindowFunction , it comes an error like the following > : > {quote}Error:(37, 109) '=>' expected but '=' found. > def process(key: String, context: Context, input: Iterable[(String, Long)], > out: Collector[String]) : () = {{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12622) The process function compiles error in MyProcessWindowFunction in windows.md
chaiyongqiang created FLINK-12622: - Summary: The process function compiles error in MyProcessWindowFunction in windows.md Key: FLINK-12622 URL: https://issues.apache.org/jira/browse/FLINK-12622 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.8.0 Reporter: chaiyongqiang The process function defined as below in windows.md : {quote}class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = { var count = 0L for (in <- input) { count = count + 1 } out.collect(s"Window ${context.window} count: $count") } }{quote} The process function defined in ProcessWindowFunction has a return vlue of Unit , But the override in MyProcessWindowFunction doesn't match it well. When compiling MyProcessWindowFunction , it comes an error like the following : {quote}Error:(37, 109) '=>' expected but '=' found. def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) : () = {{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11926) The Description of Assigners allowing a fixed amount of lateness is not correct
[ https://issues.apache.org/jira/browse/FLINK-11926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16795518#comment-16795518 ] chaiyongqiang commented on FLINK-11926: --- I'm confused. Let's consider the simpler situation without a window. Just like the following image, when the watermark is W(11), the later messages with a timestamp 14,17,12...will all be dropped ? In my opinion, Only the messages with a timestamp less than 11 should be dropped. !screenshot-1.png! > The Description of Assigners allowing a fixed amount of lateness is not > correct > --- > > Key: FLINK-11926 > URL: https://issues.apache.org/jira/browse/FLINK-11926 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.7.2 >Reporter: chaiyongqiang >Priority: Minor > Attachments: screenshot-1.png > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11926) The Description of Assigners allowing a fixed amount of lateness is not correct
[ https://issues.apache.org/jira/browse/FLINK-11926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chaiyongqiang updated FLINK-11926: -- Attachment: screenshot-1.png > The Description of Assigners allowing a fixed amount of lateness is not > correct > --- > > Key: FLINK-11926 > URL: https://issues.apache.org/jira/browse/FLINK-11926 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.7.2 >Reporter: chaiyongqiang >Priority: Minor > Attachments: screenshot-1.png > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11926) The Description of Assigners allowing a fixed amount of lateness is not correct
[ https://issues.apache.org/jira/browse/FLINK-11926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16794104#comment-16794104 ] chaiyongqiang commented on FLINK-11926: --- Let's suppose we hava a watermark with a timestamp 100(t_w), and now we get a event with a timestamp of 105 (t), now the lateness is 105 - 100 > 0 , and the new event should be dropped ? > The Description of Assigners allowing a fixed amount of lateness is not > correct > --- > > Key: FLINK-11926 > URL: https://issues.apache.org/jira/browse/FLINK-11926 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.7.2 >Reporter: chaiyongqiang >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11926) The Description of Assigners allowing a fixed amount of lateness is not correct
chaiyongqiang created FLINK-11926: - Summary: The Description of Assigners allowing a fixed amount of lateness is not correct Key: FLINK-11926 URL: https://issues.apache.org/jira/browse/FLINK-11926 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.7.2 Reporter: chaiyongqiang -- This message was sent by Atlassian JIRA (v7.6.3#76005)