[jira] [Commented] (FLINK-17874) Writing to hbase throws NPE

2020-05-21 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17113747#comment-17113747
 ] 

chaiyongqiang commented on FLINK-17874:
---

When writing a filed to hbase, we serialize ti to bytes. For now,  in  
*_HBaseTypeUtils.serializeFromObject_*  only check NULL value for STRING and 
BYTE, For other type , a NPE is throwed.

I think we need a check at the beginning of 
*_HBaseTypeUtils.serializeFromObject_*.

> Writing to hbase throws NPE
> ---
>
> Key: FLINK-17874
> URL: https://issues.apache.org/jira/browse/FLINK-17874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.10.0
>Reporter: chaiyongqiang
>Priority: Major
> Attachments: NPE.png
>
>
> Writing a table to hbase throw NPE when the field is NULL, we need handle it .
> Please refer to NPE.png for the detail stack. !NPE.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17874) Writing to hbase throws NPE

2020-05-21 Thread chaiyongqiang (Jira)
chaiyongqiang created FLINK-17874:
-

 Summary: Writing to hbase throws NPE
 Key: FLINK-17874
 URL: https://issues.apache.org/jira/browse/FLINK-17874
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HBase
Affects Versions: 1.10.0
Reporter: chaiyongqiang
 Attachments: NPE.png

Writing a table to hbase throw NPE when the field is NULL, we need handle it .

Please refer to NPE.png for the detail stack. !NPE.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16626) Exception encountered when cancelling a job in yarn per-job mode

2020-03-17 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061326#comment-17061326
 ] 

chaiyongqiang commented on FLINK-16626:
---

 Maybe you'ra right. As far as i see, *flink stop* is going to be removed .Only 
*flink cancel* comes into this logic. Also there could be some other situation 
i haven't noticed.

> Exception encountered when cancelling a job in yarn per-job mode
> 
>
> Key: FLINK-16626
> URL: https://issues.apache.org/jira/browse/FLINK-16626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: chaiyongqiang
>Priority: Major
>
> When executing the following command to stop a flink job with yarn per-job 
> mode, the client keeps retrying untill timeout (1 minutes)and exit with 
> failure. But the job stops successfully.
>  Command :
> {noformat}
> flink cancel $jobId yid appId
> {noformat}
>  The exception on the client side is :
> {quote}bq. 
> 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> ...
> 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Shutting down rest endpoint.
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,077 DEBUG 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
> thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
> 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
> endpoint shutdown complete.
> 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>  ... 9 more
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliF

[jira] [Commented] (FLINK-16637) Flink per job mode terminates before serving job cancellation result

2020-03-17 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061323#comment-17061323
 ] 

chaiyongqiang commented on FLINK-16637:
---

Hi, [~tison], i see this problem too.  I created another issue [FLINK-16626  
|https://issues.apache.org/jira/browse/FLINK-16626]and give some explantiation 
for this issue.  Do you have a plan to fix this? 

> Flink per job mode terminates before serving job cancellation result
> 
>
> Key: FLINK-16637
> URL: https://issues.apache.org/jira/browse/FLINK-16637
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Zili Chen
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
> Attachments: yarn.log
>
>
> The {{MiniDispatcher}} no longer waits until the REST handler has served the 
> cancellation result before shutting down. This behaviour seems to be 
> introduced with FLINK-15116.
> See also 
> [https://lists.apache.org/x/thread.html/rcadbd6ceede422bac8d4483fd0cdae58659fbff78533a399eb136743@%3Cdev.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16626) Exception encountered when cancelling a job in yarn per-job mode

2020-03-16 Thread chaiyongqiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaiyongqiang updated FLINK-16626:
--
Summary: Exception encountered when cancelling a job in yarn per-job mode  
(was: Exception ncountered when cancelling a job in yarn per-job mode)

> Exception encountered when cancelling a job in yarn per-job mode
> 
>
> Key: FLINK-16626
> URL: https://issues.apache.org/jira/browse/FLINK-16626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: chaiyongqiang
>Priority: Major
>
> When executing the following command to stop a flink job with yarn per-job 
> mode, the client keeps retrying untill timeout (1 minutes)and exit with 
> failure. But the job stops successfully.
>  Command :
> {noformat}
> flink cancel $jobId yid appId
> {noformat}
>  The exception on the client side is :
> {quote}bq. 
> 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> ...
> 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Shutting down rest endpoint.
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,077 DEBUG 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
> thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
> 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
> endpoint shutdown complete.
> 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>  ... 9 more
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>  ... 9 more
> {quote}
> Actually, the j

[jira] [Updated] (FLINK-16626) Exception ncountered when cancelling a job in yarn per-job mode

2020-03-16 Thread chaiyongqiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaiyongqiang updated FLINK-16626:
--
Summary: Exception ncountered when cancelling a job in yarn per-job mode  
(was: Exception Encountered when cancelling a job in yarn per-job mode)

> Exception ncountered when cancelling a job in yarn per-job mode
> ---
>
> Key: FLINK-16626
> URL: https://issues.apache.org/jira/browse/FLINK-16626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: chaiyongqiang
>Priority: Major
>
> When executing the following command to stop a flink job with yarn per-job 
> mode, the client keeps retrying untill timeout (1 minutes)and exit with 
> failure. But the job stops successfully.
>  Command :
> {noformat}
> flink cancel $jobId yid appId
> {noformat}
>  The exception on the client side is :
> {quote}bq. 
> 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> ...
> 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Shutting down rest endpoint.
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,077 DEBUG 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
> thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
> 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
> endpoint shutdown complete.
> 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>  ... 9 more
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>  ... 9 more
> {quote}
> Actually, the job

[jira] [Comment Edited] (FLINK-16626) Exception Encountered when cancelling a job in yarn per-job mode

2020-03-16 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17060639#comment-17060639
 ] 

chaiyongqiang edited comment on FLINK-16626 at 3/17/20, 5:12 AM:
-

In my opinion, when the client call an cancel command, the server will run into 
the following logic :
 # call the JobCancellationHandler to handle the request
 # response to the client

Codes as follows:
{code:java}
// @Override
protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, 
HttpRequest httpRequest, HandlerRequest handlerRequest, T gateway) {
   CompletableFuture response;

   try {
  response = handleRequest(handlerRequest, gateway);
   } catch (RestHandlerException e) {
  response = FutureUtils.completedExceptionally(e);
   }

   return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, 
httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
}
{code}
But in the handleRequest function of `JobCancellationHandler`, the cluster will 
stop and call to shutdown the RestServerEndpoint in which the EventLoopGroup 
also will be shutdown.

 

Then when calling `HandlerUtils.sendResponse`  to response to the client,  
exception occurs.

 


was (Author: chaiyq):
In my opinion, when the client call an cancel command, the server will run into 
the following logic :
 # call the JobCancellationHandler to handle the request
 # response to the client

Codes as follows:
{quote}@Override
protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, 
HttpRequest httpRequest, HandlerRequest handlerRequest, T gateway) {
 CompletableFuture response;

 try {
 response = handleRequest(handlerRequest, gateway);
 } catch (RestHandlerException e) {
 response = FutureUtils.completedExceptionally(e);
 }

 return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, 
resp, messageHeaders.getResponseStatusCode(), responseHeaders));
}{quote}
But in the handleRequest function of `JobCancellationHandler`, the cluster will 
stop and call to shutdown the RestServerEndpoint in which the EventLoopGroup 
also will be shutdown.

 

Then when calling `HandlerUtils.sendResponse`  to response to the client,  
exception occurs.

 

> Exception Encountered when cancelling a job in yarn per-job mode
> 
>
> Key: FLINK-16626
> URL: https://issues.apache.org/jira/browse/FLINK-16626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: chaiyongqiang
>Priority: Major
>
> When executing the following command to stop a flink job with yarn per-job 
> mode, the client keeps retrying untill timeout (1 minutes)and exit with 
> failure. But the job stops successfully.
>  Command :
> {noformat}
> flink cancel $jobId yid appId
> {noformat}
>  The exception on the client side is :
> {quote}bq. 
> 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> ...
> 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Shutting down rest endpoint.
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,077 DEBUG 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
> thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
> 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
> endpoint shutdown complete.
> 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.jav

[jira] [Commented] (FLINK-16626) Exception Encountered when cancelling a job in yarn per-job mode

2020-03-16 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17060639#comment-17060639
 ] 

chaiyongqiang commented on FLINK-16626:
---

In my opinion, when the client call an cancel command, the server will run into 
the following logic :
 # call the JobCancellationHandler to handle the request
 # response to the client

Codes as follows:
{quote}@Override
protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, 
HttpRequest httpRequest, HandlerRequest handlerRequest, T gateway) {
 CompletableFuture response;

 try {
 response = handleRequest(handlerRequest, gateway);
 } catch (RestHandlerException e) {
 response = FutureUtils.completedExceptionally(e);
 }

 return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, 
resp, messageHeaders.getResponseStatusCode(), responseHeaders));
}{quote}
But in the handleRequest function of `JobCancellationHandler`, the cluster will 
stop and call to shutdown the RestServerEndpoint in which the EventLoopGroup 
also will be shutdown.

 

Then when calling `HandlerUtils.sendResponse`  to response to the client,  
exception occurs.

 

> Exception Encountered when cancelling a job in yarn per-job mode
> 
>
> Key: FLINK-16626
> URL: https://issues.apache.org/jira/browse/FLINK-16626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: chaiyongqiang
>Priority: Major
>
> When executing the following command to stop a flink job with yarn per-job 
> mode, the client keeps retrying untill timeout (1 minutes)and exit with 
> failure. But the job stops successfully.
>  Command :
> {noformat}
> flink cancel $jobId yid appId
> {noformat}
>  The exception on the client side is :
> {quote}bq. 
> 2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> ...
> 2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Shutting down rest endpoint.
> 2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
> Sending request of class class 
> org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
> ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
> 2020-03-17 12:33:14,077 DEBUG 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
> thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
> 2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
> endpoint shutdown complete.
> 2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>  ... 9 more
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not cancel job 
> cc61033484d4c0e7a27a8a2a36f4de7a.
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>  at 
> org.apache.fli

[jira] [Updated] (FLINK-16626) Exception Encountered when cancelling a job in yarn per-job mode

2020-03-16 Thread chaiyongqiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaiyongqiang updated FLINK-16626:
--
Description: 
When executing the following command to stop a flink job with yarn per-job 
mode, the client keeps retrying untill timeout (1 minutes)and exit with 
failure. But the job stops successfully.

 Command :
{noformat}
flink cancel $jobId yid appId
{noformat}
 The exception on the client side is :
{quote}bq. 

2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
Sending request of class class 
org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
...
2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - 
Sending request of class class 
org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
Shutting down rest endpoint.
2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - 
Sending request of class class 
org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
2020-03-17 12:33:14,077 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 
thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest 
endpoint shutdown complete.
2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error 
while running the command.
org.apache.flink.util.FlinkException: Could not cancel job 
cc61033484d4c0e7a27a8a2a36f4de7a.
 at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
 at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
 at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.TimeoutException
 at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
 ... 9 more


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not cancel job 
cc61033484d4c0e7a27a8a2a36f4de7a.
 at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
 at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
 at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.TimeoutException
 at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
 ... 9 more
{quote}
Actually, the job was cancelled. But the server also prints some exception:

 
{quote}2020-03-17 12:25:13,754 ERROR [flink-akka.actor.default-dispatcher-17] 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:766)
 - Failed to submit a listener notification task. Event loop shut down? 
java.util.concurrent.RejectedExecutionException: event executor terminated at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:855)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:340)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExec

[jira] [Created] (FLINK-16626) Exception Encountered when cancelling a job in yarn per-job mode

2020-03-16 Thread chaiyongqiang (Jira)
chaiyongqiang created FLINK-16626:
-

 Summary: Exception Encountered when cancelling a job in yarn 
per-job mode
 Key: FLINK-16626
 URL: https://issues.apache.org/jira/browse/FLINK-16626
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.10.0
Reporter: chaiyongqiang


When executing the following command to stop a flink job with yarn per-job 
mode, the client keeps retrying untill timeout (1 minutes)and exit with 
failure. But the job stops successfully.

 Command :
{noformat}
flink cancel $jobId yid appId
{noformat}
 The exception on the client side is :
{quote}bq. 

2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - 
Sending request of class class 
org.apache.flink.runtime.rest.messages.EmptyRequestBody to 
ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
2020-03-17 12:32:13,749 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector - 
-Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.level: simple
2020-03-17 12:32:13,749 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector - 
-Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.targetRecords: 4
2020-03-17 12:32:13,759 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true
2020-03-17 12:32:13,759 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true
2020-03-17 12:32:13,760 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory - 
Loaded default ResourceLeakDetector: 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@7cb68452
2020-03-17 12:32:13,786 DEBUG 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId - 
-Dio.netty.processId: 156764 (auto-detected)
2020-03-17 12:32:13,788 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.NetUtil - 
-Djava.net.preferIPv4Stack: false
2020-03-17 12:32:13,788 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.NetUtil - 
-Djava.net.preferIPv6Addresses: false
2020-03-17 12:32:13,791 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.NetUtil - Loopback interface: lo 
(lo, 0:0:0:0:0:0:0:1%lo)
2020-03-17 12:32:13,791 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.NetUtil - 
/proc/sys/net/core/somaxconn: 128
2020-03-17 12:32:13,793 DEBUG 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId - 
-Dio.netty.machineId: 02:42:dd:ff:fe:50:e5:91 (auto-detected)
2020-03-17 12:32:13,820 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.numHeapArenas: 16
2020-03-17 12:32:13,821 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.numDirectArenas: 16
2020-03-17 12:32:13,821 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.pageSize: 8192
2020-03-17 12:32:13,821 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.maxOrder: 11
2020-03-17 12:32:13,821 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.chunkSize: 16777216
2020-03-17 12:32:13,821 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.tinyCacheSize: 512
2020-03-17 12:32:13,821 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.smallCacheSize: 256
2020-03-17 12:32:13,821 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.normalCacheSize: 64
2020-03-17 12:32:13,821 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.maxCachedBufferCapacity: 32768
2020-03-17 12:32:13,822 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.cacheTrimInterval: 8192
2020-03-17 12:32:13,822 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.cacheTrimIntervalMillis: 0
2020-03-17 12:32:13,822 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.useCacheForAllThreads: true
2020-03-17 12:32:13,822 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator - 
-Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
2020-03-17 12:32:13,830 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil - 
-Dio.netty.allocator.type: pooled
2020-03-17 12:32:13,830 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil - 
-Dio.netty.threadLocalDirectBufferSize: 0
2020-03-17 12:32:13,830 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil - 
-Dio.netty.maxThreadLocalCharBufferSize: 16384

[jira] [Commented] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API

2019-12-12 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995396#comment-16995396
 ] 

chaiyongqiang commented on FLINK-14719:
---

[~jark], thanks.  I'll handle  this myself  on flink1.8  branch and will create 
a pr for developing branch.  Wish i could catch flink1.10 which will be 
released next month. 
Please close this issue. 

> Making  Semantic configurable in Flinkkafkaproducer to support exactly-once 
> semantic in Table API
> -
>
> Key: FLINK-14719
> URL: https://issues.apache.org/jira/browse/FLINK-14719
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.8.0
>Reporter: chaiyongqiang
>Priority: Major
>
> Flink supports kafka transaction with FlinkKafkaProducer and 
> FlinkKafkaProducer011 . When we use Datastream API , it's able to realize 
> exactly once semantic .  But when we use Table API, things are different. 
> The createKafkaProducer method in KafkaTableSink is used to create 
> FlinkKafkaProducer to sending messages to Kafka server.  It's like :
> {code:java}
> protected SinkFunction createKafkaProducer(
>   String topic,
>   Properties properties,
>   SerializationSchema serializationSchema,
>   Optional> partitioner) {
>   return new FlinkKafkaProducer<>(
>   topic,
>   new 
> KeyedSerializationSchemaWrapper<>(serializationSchema),
>   properties,
>   partitioner);
>   }
> {code}
> when we get into the constructor of FlinkKafkaProducer we can see this will 
> lead to an at_least_once semantic producer :
> {code:java}
>   public FlinkKafkaProducer(
>   String defaultTopicId,
>   KeyedSerializationSchema serializationSchema,
>   Properties producerConfig,
>   Optional> customPartitioner) {
>   this(
>   defaultTopicId,
>   serializationSchema,
>   producerConfig,
>   customPartitioner,
>   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
>   DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>   }
> {code}
> This makes user could not achieve exactly-once semantic when using Table API. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15221) supporting Exactly-once for kafka table APi

2019-12-12 Thread chaiyongqiang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaiyongqiang updated FLINK-15221:
--
Summary: supporting Exactly-once for kafka table APi  (was: supporting 
Exactly-once for table APi)

> supporting Exactly-once for kafka table APi
> ---
>
> Key: FLINK-15221
> URL: https://issues.apache.org/jira/browse/FLINK-15221
> Project: Flink
>  Issue Type: Wish
>  Components: Table SQL / API
>Reporter: chaiyongqiang
>Priority: Major
>
> The Table Api doesn't support End to End Exactly once sematic like datastream 
> Api.  Does Flink have a plan for this?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15221) supporting Exactly-once for table APi

2019-12-12 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995247#comment-16995247
 ] 

chaiyongqiang commented on FLINK-15221:
---

[~jark] thanks . You're right . But i've  checked the kafkatablesink. I don't 
think an overwrite of 'KafkaTableSink' and 'KafkaTableSourceSinkFactory'  is a 
perfect way to gain the end-to-end exactly-once semantic when a user develops 
an app .   It's a too complex for a flink app developer and it makes him focus 
too much on the details of flink. I think   we'd better expose a semantic 
configuration for app users to set when they want exectly-once semantic. 

 
You can have a look at the following issue i opened. How do you think about it?

[FLINK-14719|https://issues.apache.org/jira/projects/FLINK/issues/FLINK-14719?filter=allopenissues]

> supporting Exactly-once for table APi
> -
>
> Key: FLINK-15221
> URL: https://issues.apache.org/jira/browse/FLINK-15221
> Project: Flink
>  Issue Type: Wish
>  Components: Table SQL / API
>Reporter: chaiyongqiang
>Priority: Major
>
> The Table Api doesn't support End to End Exactly once sematic like datastream 
> Api.  Does Flink have a plan for this?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15221) supporting Exactly-once for table APi

2019-12-12 Thread chaiyongqiang (Jira)
chaiyongqiang created FLINK-15221:
-

 Summary: supporting Exactly-once for table APi
 Key: FLINK-15221
 URL: https://issues.apache.org/jira/browse/FLINK-15221
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / API
Affects Versions: 1.9.0
Reporter: chaiyongqiang


The Table Api doesn't support End to End Exactly once sematic like datastream 
Api.  Does Flink have a plan for this?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14871) Better formatter of toString method for StateTransition

2019-11-19 Thread chaiyongqiang (Jira)
chaiyongqiang created FLINK-14871:
-

 Summary: Better formatter of toString method for StateTransition
 Key: FLINK-14871
 URL: https://issues.apache.org/jira/browse/FLINK-14871
 Project: Flink
  Issue Type: Improvement
  Components: Library / CEP
Affects Versions: 1.9.1
Reporter: chaiyongqiang
 Attachments: 屏幕快照 2019-11-20 下午3.27.43.png

The toString method in StateTransition does not have a good format. 
{code:java}
@Override
public String toString() {
return new StringBuilder()
.append("StateTransition(")
.append(action).append(", ")
.append("from ").append(sourceState.getName())
.append("to ").append(targetState.getName())
.append(condition != null ? ", with condition)" 
: ")")
.toString();
}
{code}
The problem is there's no separator between `sourceState.getName()` and "to " 
which leads to a bad format like the attachment showing.

A blank space is more friendly.
{code:java}
@Override
public String toString() {
return new StringBuilder()
.append("StateTransition(")
.append(action).append(", ")
.append("from ").append(sourceState.getName())
.append(" to ").append(targetState.getName())
.append(condition != null ? ", with condition)" 
: ")")
.toString();
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-13052) Supporting multi-topic when using kafkaTableSourceSinkFactoryBase.createStreamTableSource

2019-11-12 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972243#comment-16972243
 ] 

chaiyongqiang edited comment on FLINK-13052 at 11/12/19 10:08 AM:
--

There're some limits when we support multi source topics in 
kafkaTableSourceSinkFactoryBase. It's like :

When we assign the startup mode as specific-offsets, it's difficult to assign 
the start-offset for each topic partition because of the way to set the 
connector configs.  As the following codes show, when we assign several topics 
like MyTable, MyTable1,MyTable2 , it's difficult to distinguish the partition 
and offset belong to wich topic. 

{code:java}
final Map props2 = new HashMap<>();
props2.put("connector.property-version", "1");
props2.put("connector.type", "kafka");
props2.put("connector.version", "0.11");
props2.put("connector.topic", "MyTable");
props2.put("connector.startup-mode", "specific-offsets");
props2.put("connector.specific-offsets.0.partition", "0");
props2.put("connector.specific-offsets.0.offset", "42");
props2.put("connector.specific-offsets.1.partition", "1");
{code}

But fortunately, setting a fixed topic-partition start offset when we start an 
app with table API is rare. 
My solution is supporting multi-topic for other kind of start-up mode like 
GROUP_OFFSETS, EARLIEST,LATEST,  for SPECIFIC_OFFSETS mode  when using Table 
API like kafkaTableSourceSinkFactoryBase with several topics we use the 
GROUP_OFFSETS instead.  
Does someone have a better idea ? Please let me know. Many thanks


was (Author: chaiyq):
There're some limits when we support multi source topics in 
kafkaTableSourceSinkFactoryBase. It's like :

When we assign the startup mode as specific-offsets, it's difficult to assign 
the start-offset for each topic partition because of the way to set the 
connector configs.  As the following codes show, when we assign several topics 
like MyTable, MyTable1,MyTable2 , it's difficult to distinguish the partition 
and offset belong to wich topic. 

{code:java}
final Map props2 = new HashMap<>();
props2.put("connector.property-version", "1");
props2.put("connector.type", "kafka");
props2.put("connector.version", "0.11");
props2.put("connector.topic", "MyTable");
props2.put("connector.startup-mode", "specific-offsets");
props2.put("connector.specific-offsets.0.partition", "0");
props2.put("connector.specific-offsets.0.offset", "42");
props2.put("connector.specific-offsets.1.partition", "1");
{code}

But fortunately, setting a fixed topic-partition start offset when we start an 
app with table API is rare. For now,  i suggest ignoring this situation.  My 
solution is supporting multi-topic for other kind of start-up mode like 
GROUP_OFFSETS, EARLIEST,LATEST,  for SPECIFIC_OFFSETS mode  when using Table 
API like kafkaTableSourceSinkFactoryBase with several topics we use the 
GROUP_OFFSETS instead. 

> Supporting multi-topic when using 
> kafkaTableSourceSinkFactoryBase.createStreamTableSource
> -
>
> Key: FLINK-13052
> URL: https://issues.apache.org/jira/browse/FLINK-13052
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / API
>Affects Versions: 1.8.0
>Reporter: chaiyongqiang
>Priority: Critical
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13052) Supporting multi-topic when using kafkaTableSourceSinkFactoryBase.createStreamTableSource

2019-11-12 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972243#comment-16972243
 ] 

chaiyongqiang commented on FLINK-13052:
---

There're some limits when we support multi source topics in 
kafkaTableSourceSinkFactoryBase. It's like :

When we assign the startup mode as specific-offsets, it's difficult to assign 
the start-offset for each topic partition because of the way to set the 
connector configs.  As the following codes show, when we assign several topics 
like MyTable, MyTable1,MyTable2 , it's difficult to distinguish the partition 
and offset belong to wich topic. 

{code:java}
final Map props2 = new HashMap<>();
props2.put("connector.property-version", "1");
props2.put("connector.type", "kafka");
props2.put("connector.version", "0.11");
props2.put("connector.topic", "MyTable");
props2.put("connector.startup-mode", "specific-offsets");
props2.put("connector.specific-offsets.0.partition", "0");
props2.put("connector.specific-offsets.0.offset", "42");
props2.put("connector.specific-offsets.1.partition", "1");
{code}

But fortunately, setting a fixed topic-partition start offset when we start an 
app with table API is rare. For now,  i suggest ignoring this situation.  My 
solution is supporting multi-topic for other kind of start-up mode like 
GROUP_OFFSETS, EARLIEST,LATEST,  for SPECIFIC_OFFSETS mode  when using Table 
API like kafkaTableSourceSinkFactoryBase with several topics we use the 
GROUP_OFFSETS instead. 

> Supporting multi-topic when using 
> kafkaTableSourceSinkFactoryBase.createStreamTableSource
> -
>
> Key: FLINK-13052
> URL: https://issues.apache.org/jira/browse/FLINK-13052
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / API
>Affects Versions: 1.8.0
>Reporter: chaiyongqiang
>Priority: Critical
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API

2019-11-12 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972191#comment-16972191
 ] 

chaiyongqiang edited comment on FLINK-14719 at 11/12/19 8:55 AM:
-

When i check the code in branch Flink 1.9 and the master, the constructor in 
FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we 
could modify the createKafkaProducer method in KafkaTableSinkBase and all the 
classes which extend KafkaTableSinkBase to support exactly-once Semantic API in 
Flink.  +I could open a new issue to tracking this.+

But for branch flink 1.8 , a light weight method would help. We could achieve 
the semantic config and set it in the constructor in the following way.
{code:java}

/**
* Configuration key for disabling the metrics reporting.
*/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
* Configuration key for setting the producer semantic.
*/
public static final String KEY_SEMANTIC = "flink.semantic";

public FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema serializationSchema,
Properties producerConfig,
Optional> customPartitioner) {
this(
defaultTopicId,
serializationSchema,
producerConfig,
customPartitioner,

Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, 
Semantic.AT_LEAST_ONCE.name()).toUpperCase()),
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
{code}

Could someone offer me some advice ? Many thans.


was (Author: chaiyq):
when i check the code in branch Flink 1.9 and the master, the constructor in 
FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we 
could modify the createKafkaProducer method in KafkaTableSinkBase and all the 
classes which extend KafkaTableSinkBase to support exactly-once Semantic API in 
Flink.  +I could open a new issue to tracking this.+

But for branch flink 1.8 , a light weight method would help. We could achieve 
the semantic config and set it in the constructor in the following way.
{code:java}

/**
* Configuration key for disabling the metrics reporting.
*/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
* Configuration key for setting the producer semantic.
*/
public static final String KEY_SEMANTIC = "flink.semantic";

public FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema serializationSchema,
Properties producerConfig,
Optional> customPartitioner) {
this(
defaultTopicId,
serializationSchema,
producerConfig,
customPartitioner,

Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, 
Semantic.AT_LEAST_ONCE.name()).toUpperCase()),
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
{code}

Could someone offer me some advice ? Many thans.

> Making  Semantic configurable in Flinkkafkaproducer to support exactly-once 
> semantic in Table API
> -
>
> Key: FLINK-14719
> URL: https://issues.apache.org/jira/browse/FLINK-14719
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.8.0
>Reporter: chaiyongqiang
>Priority: Major
>
> Flink supports kafka transaction with FlinkKafkaProducer and 
> FlinkKafkaProducer011 . When we use Datastream API , it's able to realize 
> exactly once semantic .  But when we use Table API, things are different. 
> The createKafkaProducer method in KafkaTableSink is used to create 
> FlinkKafkaProducer to sending messages to Kafka server.  It's like :
> {code:java}
> protected SinkFunction createKafkaProducer(
>   String topic,
>   Properties properties,
>   SerializationSchema serializationSchema,
>   Optional> partitioner) {
>   return new FlinkKafkaProducer<>(
>   topic,
>   new 
> KeyedSerializationSchemaWrapper<>(serializationSchema),
>   properties,
>   partitioner);
>   }
> {code}
> when we get into the constructor of FlinkKafkaProducer we can see this will 
> lead to an at_least_once semantic producer :
> {code:java}
>   public FlinkKafkaProducer(
>   String defaultTopicId,
>   KeyedSerializationSchema serializationSchema,
>   Properties producerConfig,
>   Optional> customPartitioner) {
>

[jira] [Commented] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API

2019-11-12 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972191#comment-16972191
 ] 

chaiyongqiang commented on FLINK-14719:
---

when i check the code in branch Flink 1.9 and the master, the constructor in 
FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we 
could modify the createKafkaProducer method in KafkaTableSinkBase and all the 
classes which extend KafkaTableSinkBase to support exactly-once Semantic API in 
Flink.  +I could open a new issue to tracking this.+

But for branch flink 1.8 , a light weight method would help. We could achieve 
the semantic config and set it in the constructor in the following way.
{code:java}

/**
* Configuration key for disabling the metrics reporting.
*/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
* Configuration key for setting the producer semantic.
*/
public static final String KEY_SEMANTIC = "flink.semantic";

public FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema serializationSchema,
Properties producerConfig,
Optional> customPartitioner) {
this(
defaultTopicId,
serializationSchema,
producerConfig,
customPartitioner,

Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, 
Semantic.AT_LEAST_ONCE.name()).toUpperCase()),
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
{code}

Could someone offer me some advice ? Many thans.

> Making  Semantic configurable in Flinkkafkaproducer to support exactly-once 
> semantic in Table API
> -
>
> Key: FLINK-14719
> URL: https://issues.apache.org/jira/browse/FLINK-14719
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.8.0
>Reporter: chaiyongqiang
>Priority: Major
>
> Flink supports kafka transaction with FlinkKafkaProducer and 
> FlinkKafkaProducer011 . When we use Datastream API , it's able to realize 
> exactly once semantic .  But when we use Table API, things are different. 
> The createKafkaProducer method in KafkaTableSink is used to create 
> FlinkKafkaProducer to sending messages to Kafka server.  It's like :
> {code:java}
> protected SinkFunction createKafkaProducer(
>   String topic,
>   Properties properties,
>   SerializationSchema serializationSchema,
>   Optional> partitioner) {
>   return new FlinkKafkaProducer<>(
>   topic,
>   new 
> KeyedSerializationSchemaWrapper<>(serializationSchema),
>   properties,
>   partitioner);
>   }
> {code}
> when we get into the constructor of FlinkKafkaProducer we can see this will 
> lead to an at_least_once semantic producer :
> {code:java}
>   public FlinkKafkaProducer(
>   String defaultTopicId,
>   KeyedSerializationSchema serializationSchema,
>   Properties producerConfig,
>   Optional> customPartitioner) {
>   this(
>   defaultTopicId,
>   serializationSchema,
>   producerConfig,
>   customPartitioner,
>   FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
>   DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>   }
> {code}
> This makes user could not achieve exactly-once semantic when using Table API. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API

2019-11-12 Thread chaiyongqiang (Jira)
chaiyongqiang created FLINK-14719:
-

 Summary: Making  Semantic configurable in Flinkkafkaproducer to 
support exactly-once semantic in Table API
 Key: FLINK-14719
 URL: https://issues.apache.org/jira/browse/FLINK-14719
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.8.0
Reporter: chaiyongqiang


Flink supports kafka transaction with FlinkKafkaProducer and 
FlinkKafkaProducer011 . When we use Datastream API , it's able to realize 
exactly once semantic .  But when we use Table API, things are different. 

The createKafkaProducer method in KafkaTableSink is used to create 
FlinkKafkaProducer to sending messages to Kafka server.  It's like :


{code:java}
protected SinkFunction createKafkaProducer(
String topic,
Properties properties,
SerializationSchema serializationSchema,
Optional> partitioner) {
return new FlinkKafkaProducer<>(
topic,
new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
properties,
partitioner);
}
{code}

when we get into the constructor of FlinkKafkaProducer we can see this will 
lead to an at_least_once semantic producer :


{code:java}
public FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema serializationSchema,
Properties producerConfig,
Optional> customPartitioner) {
this(
defaultTopicId,
serializationSchema,
producerConfig,
customPartitioner,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
{code}

This makes user could not achieve exactly-once semantic when using Table API. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12979) Sending data to kafka with CsvRowSerializationSchema always adding a "\n", "\r","\r\n" at the end of the message

2019-08-25 Thread chaiyongqiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16915176#comment-16915176
 ] 

chaiyongqiang commented on FLINK-12979:
---

#9529

> Sending data to kafka with CsvRowSerializationSchema always adding a "\n", 
> "\r","\r\n" at the end of the message
> 
>
> Key: FLINK-12979
> URL: https://issues.apache.org/jira/browse/FLINK-12979
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0, 2.0.0
>Reporter: chaiyongqiang
>Assignee: Hugo Louro
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When sending data to kafka using CsvRowSerializationSchema, the 
> CsvRowSerializationSchema#serialize method helps generating value for 
> KafkaRecord,  which will call  CsvEncoder#endRow and in which  a 
> _cfgLineSeparator will be added at the end of KafkaRecord.value.
> But For  CsvRowSerializationSchema#Builder , when you calling the mothod 
> setLineDelimiter only "\n","\r","\r\n" could be used as the parameter.  
> It's not friendly when you want to send a message "123,pingpong,21:00" to 
> kafka but kafka receives a message  "123,pingpong,21:00\r\n".
> I'm not sure about the reason for limitting the lineDelimiter to 
> "\n","\r","\r\n" ? In previous version and jackson-databind, there's no 
> limits on lineDelimiter. 
> But at least it should let the application developer to set LineDelimiter 
> with "".



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13052) Supporting multi-topic when using kafkaTableSourceSinkFactoryBase.createStreamTableSource

2019-07-02 Thread chaiyongqiang (JIRA)
chaiyongqiang created FLINK-13052:
-

 Summary: Supporting multi-topic when using 
kafkaTableSourceSinkFactoryBase.createStreamTableSource
 Key: FLINK-13052
 URL: https://issues.apache.org/jira/browse/FLINK-13052
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.8.0
Reporter: chaiyongqiang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12979) Sending data to kafka with CsvRowSerializationSchema always adding a "\n", "\r","\r\n" at the end of the message

2019-07-02 Thread chaiyongqiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876735#comment-16876735
 ] 

chaiyongqiang commented on FLINK-12979:
---

When i check  com.fasterxml.jackson.dataformat.csv.CsvSchema, there's no limit 
for setLineSeparator.  The default is '\n', So, For flink, we need set the 
LineSeparator to ''.


 Even we set a custom property, we also need set  the LineSeparator to '' when 
we handle the custom property.  So  just  make '' as an available charactor is 
a simple but effective way to solve this. 

Just like following check. we could allow user set  an ''.  It has the same 
result. 

 if (!delimiter.equals("\n") && !delimiter.equals("\r") && 
!delimiter.equals("\r\n")) {
throw new IllegalArgumentException(
"Unsupported new line delimiter. Only 
\\n, \\r, or \\r\\n are supported.");
}

Do you think  the way of  opening the limit of setting '' is ok ? 

> Sending data to kafka with CsvRowSerializationSchema always adding a "\n", 
> "\r","\r\n" at the end of the message
> 
>
> Key: FLINK-12979
> URL: https://issues.apache.org/jira/browse/FLINK-12979
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0, 2.0.0
>Reporter: chaiyongqiang
>Assignee: Hugo Louro
>Priority: Major
>
> When sending data to kafka using CsvRowSerializationSchema, the 
> CsvRowSerializationSchema#serialize method helps generating value for 
> KafkaRecord,  which will call  CsvEncoder#endRow and in which  a 
> _cfgLineSeparator will be added at the end of KafkaRecord.value.
> But For  CsvRowSerializationSchema#Builder , when you calling the mothod 
> setLineDelimiter only "\n","\r","\r\n" could be used as the parameter.  
> It's not friendly when you want to send a message "123,pingpong,21:00" to 
> kafka but kafka receives a message  "123,pingpong,21:00\r\n".
> I'm not sure about the reason for limitting the lineDelimiter to 
> "\n","\r","\r\n" ? In previous version and jackson-databind, there's no 
> limits on lineDelimiter. 
> But at least it should let the application developer to set LineDelimiter 
> with "".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12979) Sending data to kafka with CsvRowSerializationSchema always adding a "\n", "\r","\r\n" at the end of the message

2019-06-25 Thread chaiyongqiang (JIRA)
chaiyongqiang created FLINK-12979:
-

 Summary: Sending data to kafka with CsvRowSerializationSchema 
always adding a "\n", "\r","\r\n" at the end of the message
 Key: FLINK-12979
 URL: https://issues.apache.org/jira/browse/FLINK-12979
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.9.0, 2.0.0
Reporter: chaiyongqiang


When sending data to kafka using CsvRowSerializationSchema, the 
CsvRowSerializationSchema#serialize method helps generating value for 
KafkaRecord,  which will call  CsvEncoder#endRow and in which  a 
_cfgLineSeparator will be added at the end of KafkaRecord.value.

But For  CsvRowSerializationSchema#Builder , when you calling the mothod 
setLineDelimiter only "\n","\r","\r\n" could be used as the parameter.  

It's not friendly when you want to send a message "123,pingpong,21:00" to kafka 
but kafka receives a message  "123,pingpong,21:00\r\n".

I'm not sure about the reason for limitting the lineDelimiter to 
"\n","\r","\r\n" ? In previous version and jackson-databind, there's no limits 
on lineDelimiter. 

But at least it should let the application developer to set LineDelimiter with 
"".




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12622) The process function compiles error in MyProcessWindowFunction in windows.md

2019-05-24 Thread chaiyongqiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848003#comment-16848003
 ] 

chaiyongqiang commented on FLINK-12622:
---

[https://github.com/apache/flink/pull/8540/commits]

> The process function compiles error in MyProcessWindowFunction in windows.md
> 
>
> Key: FLINK-12622
> URL: https://issues.apache.org/jira/browse/FLINK-12622
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.8.0
>Reporter: chaiyongqiang
>Priority: Minor
>
> The process function defined as below in windows.md : 
> {quote}class MyProcessWindowFunction extends ProcessWindowFunction[(String, 
> Long), String, String, TimeWindow] {
>   def process(key: String, context: Context, input: Iterable[(String, Long)], 
> out: Collector[String]): () = {
> var count = 0L
> for (in <- input) {
>   count = count + 1
> }
> out.collect(s"Window ${context.window} count: $count")
>   }
> }{quote}
> The process function defined in ProcessWindowFunction  has a return vlue of 
> Unit , But the override in MyProcessWindowFunction doesn't match it well. 
> When compiling MyProcessWindowFunction , it comes an error like the following 
> :
> {quote}Error:(37, 109) '=>' expected but '=' found.
>   def process(key: String, context: Context, input: Iterable[(String, Long)], 
> out: Collector[String]) : ()  = {{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12622) The process function compiles error in MyProcessWindowFunction in windows.md

2019-05-24 Thread chaiyongqiang (JIRA)
chaiyongqiang created FLINK-12622:
-

 Summary: The process function compiles error in 
MyProcessWindowFunction in windows.md
 Key: FLINK-12622
 URL: https://issues.apache.org/jira/browse/FLINK-12622
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.8.0
Reporter: chaiyongqiang


The process function defined as below in windows.md : 

{quote}class MyProcessWindowFunction extends ProcessWindowFunction[(String, 
Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], 
out: Collector[String]): () = {
var count = 0L
for (in <- input) {
  count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
  }
}{quote}

The process function defined in ProcessWindowFunction  has a return vlue of 
Unit , But the override in MyProcessWindowFunction doesn't match it well. When 
compiling MyProcessWindowFunction , it comes an error like the following :

{quote}Error:(37, 109) '=>' expected but '=' found.
  def process(key: String, context: Context, input: Iterable[(String, Long)], 
out: Collector[String]) : ()  = {{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11926) The Description of Assigners allowing a fixed amount of lateness is not correct

2019-03-18 Thread chaiyongqiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16795518#comment-16795518
 ] 

chaiyongqiang commented on FLINK-11926:
---

I'm confused. Let's consider the simpler situation without a window. Just like 
the following image, when the watermark is W(11), the later messages with a 
timestamp 14,17,12...will all be dropped ?  In my opinion, Only the messages 
with a timestamp less than 11 should be dropped. 
!screenshot-1.png! 

> The Description of Assigners allowing a fixed amount of lateness is not 
> correct
> ---
>
> Key: FLINK-11926
> URL: https://issues.apache.org/jira/browse/FLINK-11926
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.7.2
>Reporter: chaiyongqiang
>Priority: Minor
> Attachments: screenshot-1.png
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11926) The Description of Assigners allowing a fixed amount of lateness is not correct

2019-03-18 Thread chaiyongqiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chaiyongqiang updated FLINK-11926:
--
Attachment: screenshot-1.png

> The Description of Assigners allowing a fixed amount of lateness is not 
> correct
> ---
>
> Key: FLINK-11926
> URL: https://issues.apache.org/jira/browse/FLINK-11926
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.7.2
>Reporter: chaiyongqiang
>Priority: Minor
> Attachments: screenshot-1.png
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11926) The Description of Assigners allowing a fixed amount of lateness is not correct

2019-03-15 Thread chaiyongqiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16794104#comment-16794104
 ] 

chaiyongqiang commented on FLINK-11926:
---

Let's suppose we hava a watermark with a timestamp 100(t_w), and now we get a 
event with a timestamp of 105 (t), now the lateness is 105  - 100 > 0 , and the 
new event should be dropped ? 

> The Description of Assigners allowing a fixed amount of lateness is not 
> correct
> ---
>
> Key: FLINK-11926
> URL: https://issues.apache.org/jira/browse/FLINK-11926
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.7.2
>Reporter: chaiyongqiang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11926) The Description of Assigners allowing a fixed amount of lateness is not correct

2019-03-15 Thread chaiyongqiang (JIRA)
chaiyongqiang created FLINK-11926:
-

 Summary: The Description of Assigners allowing a fixed amount of 
lateness is not correct
 Key: FLINK-11926
 URL: https://issues.apache.org/jira/browse/FLINK-11926
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.7.2
Reporter: chaiyongqiang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)