[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594502#comment-16594502
 ] 

ASF GitHub Bot commented on FLINK-10074:


tweise commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r213180551
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 ##
 @@ -49,6 +49,59 @@ public void testRethrowingHandler() {
Assert.assertNull(environment.getLastDeclinedCheckpointCause());
}
 
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(45L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+
+   Assert.fail("Exception not rethrown.");
+   } catch (Exception e) {
+   Assert.assertEquals(testException, e);
+   }
+
+   Assert.assertNull(environment.getLastDeclinedCheckpointCause());
+   }
+
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberNotTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(46L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   } catch (Exception e) {
+   Assert.assertNotEquals(testException, e);
 
 Review comment:
   Shouldn't this fail since we don't expect an exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a 

[GitHub] tweise commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures

2018-08-27 Thread GitBox
tweise commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r213180551
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 ##
 @@ -49,6 +49,59 @@ public void testRethrowingHandler() {
Assert.assertNull(environment.getLastDeclinedCheckpointCause());
}
 
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(45L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+
+   Assert.fail("Exception not rethrown.");
+   } catch (Exception e) {
+   Assert.assertEquals(testException, e);
+   }
+
+   Assert.assertNull(environment.getLastDeclinedCheckpointCause());
+   }
+
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberNotTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(46L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   } catch (Exception e) {
+   Assert.assertNotEquals(testException, e);
 
 Review comment:
   Shouldn't this fail since we don't expect an exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594497#comment-16594497
 ] 

ASF GitHub Bot commented on FLINK-10074:


tweise commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r213179903
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 ##
 @@ -49,6 +49,59 @@ public void testRethrowingHandler() {
Assert.assertNull(environment.getLastDeclinedCheckpointCause());
}
 
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(45L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
 
 Review comment:
   Shouldn't it verify that this attempt generated the exception and not 
previous?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on a change in pull request #6567: [FLINK-10074] Allowable number of checkpoint failures

2018-08-27 Thread GitBox
tweise commented on a change in pull request #6567: [FLINK-10074] Allowable 
number of checkpoint failures
URL: https://github.com/apache/flink/pull/6567#discussion_r213179903
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
 ##
 @@ -49,6 +49,59 @@ public void testRethrowingHandler() {
Assert.assertNull(environment.getLastDeclinedCheckpointCause());
}
 
+   @Test
+   public void testRethrowingHandlerWithTolerableNumberTriggered() {
+   DeclineDummyEnvironment environment = new 
DeclineDummyEnvironment();
+   
environment.getExecutionConfig().setTaskTolerableCheckpointFailuresNumber(3);
+   CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
+   CheckpointExceptionHandler exceptionHandler =
+   
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
environment);
+
+   CheckpointMetaData failedCheckpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+   Exception testException = new Exception("test");
+   try {
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(43L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(44L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
+   failedCheckpointMetaData = new CheckpointMetaData(45L, 
4711L);
+   
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData, 
testException);
 
 Review comment:
   Shouldn't it verify that this attempt generated the exception and not 
previous?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-9150) Prepare for Java 10

2018-08-27 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473198#comment-16473198
 ] 

Ted Yu edited comment on FLINK-9150 at 8/28/18 3:29 AM:


Similar error is encountered when building against jdk 11.


was (Author: yuzhih...@gmail.com):
Similar error is encountered when building against jdk 11 .

> Prepare for Java 10
> ---
>
> Key: FLINK-9150
> URL: https://issues.apache.org/jira/browse/FLINK-9150
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Reporter: Ted Yu
>Priority: Major
>
> Java 9 is not a LTS release.
> When compiling with Java 10, I see the following compilation error:
> {code}
> [ERROR] Failed to execute goal on project flink-shaded-hadoop2: Could not 
> resolve dependencies for project 
> org.apache.flink:flink-shaded-hadoop2:jar:1.6-SNAPSHOT: Could not find 
> artifact jdk.tools:jdk.tools:jar:1.6 at specified path 
> /a/jdk-10/../lib/tools.jar -> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-27 Thread GitBox
art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old 
Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-416379325
 
 
   @StephanEwen I've attached the [PDF 
file](https://issues.apache.org/jira/secure/attachment/12937334/legacy%20truncate%20logic.pdf)
 with the description of legacy truncater logic to the [JIRA 
ticket](https://issues.apache.org/jira/browse/FLINK-10203).
   Please take a look :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594261#comment-16594261
 ] 

ASF GitHub Bot commented on FLINK-10203:


art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old 
Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-416379325
 
 
   @StephanEwen I've attached the [PDF 
file](https://issues.apache.org/jira/secure/attachment/12937334/legacy%20truncate%20logic.pdf)
 with the description of legacy truncater logic to the [JIRA 
ticket](https://issues.apache.org/jira/browse/FLINK-10203).
   Please take a look :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Assignee: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
> Attachments: legacy truncate logic.pdf
>
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-27 Thread Artsem Semianenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artsem Semianenka updated FLINK-10203:
--
Attachment: legacy truncate logic.pdf

> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Assignee: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
> Attachments: legacy truncate logic.pdf
>
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2018-08-27 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594098#comment-16594098
 ] 

Stephan Ewen commented on FLINK-10195:
--

You can assign the issue to yourself, once we add you to the contributors JIRA 
group.

Usually waiting for consensus on an issue is a good idea. For connectors we 
assume that the users can judge these issues quite well.
If you make this an optional add-on to the existing consumer (flag to turn it 
on), it is less sensitive (does not break existing setups) and should be fairly 
easy to merge. 

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Priority: Major
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10214) why is job applying as many TMs as default parallelism when starting, each parallelism is 1

2018-08-27 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594087#comment-16594087
 ] 

Stephan Ewen commented on FLINK-10214:
--

This is currently a bug, being fixed. As mentioned above, this issue here 
tracks that bug: FLINK-9455

> why is job applying as many TMs as default parallelism when starting, each 
> parallelism is 1
> ---
>
> Key: FLINK-10214
> URL: https://issues.apache.org/jira/browse/FLINK-10214
> Project: Flink
>  Issue Type: Task
>  Components: Cluster Management, TaskManager, YARN
>Affects Versions: 1.5.0
>Reporter: lzh9
>Priority: Major
>
> If I set TM number=6, slot num each TM=8, memory each TM=4G, the job will 
> apply 48 TMs, and use only one slot each, total memory usage is 192G. A few 
> minutes later, it will release 40TMs, everything will be normal. 
> Is there any reason about it? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol closed pull request #6620: [hotfix][flink-streaming-java] modify CreateStreamOutput() in OperatorChain.java

2018-08-27 Thread GitBox
zentol closed pull request #6620: [hotfix][flink-streaming-java] modify 
CreateStreamOutput() in OperatorChain.java
URL: https://github.com/apache/flink/pull/6620
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 015b7db9436..d33d792751d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -397,7 +397,7 @@ public int getChainLength() {
 
TypeSerializer outSerializer = null;
 
-   if (edge.getOutputTag() != null) {
+   if (sideOutputTag != null) {
// side output
outSerializer = upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(), 
taskEnvironment.getUserClassLoader());


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #6620: [hotfix][flink-streaming-java] modify CreateStreamOutput() in OperatorChain.java

2018-08-27 Thread GitBox
zentol commented on issue #6620: [hotfix][flink-streaming-java] modify 
CreateStreamOutput() in OperatorChain.java
URL: https://github.com/apache/flink/pull/6620#issuecomment-416313047
 
 
   Please provide a description of what your change is supposed to accomplish 
when opening a PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594037#comment-16594037
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213060361
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 ##
 @@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
}
}
 
+   /**
+* Kafka 0.11 specific test, ensuring Kafka Headers are properly 
written to and read from Kafka.
+*/
+   @Test(timeout = 6)
+   public void testHeaders() throws Exception {
+   final String topic = "headers-topic";
+   final long testSequenceLength = 127L;
+   createTestTopic(topic, 3, 1);
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   DataStream testSequence = env.addSource(new 
SourceFunction() {
+   private static final long serialVersionUID = 1L;
+   boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws 
Exception {
+   long i = 0;
+   while (running) {
+   ctx.collectWithTimestamp(i, i * 2);
+   if (i++ == testSequenceLength) {
+   running = false;
+   }
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   FlinkKafkaProducer011 producer = new 
FlinkKafkaProducer011<>(topic,
+   new TestHeadersKeyedSerializationSchema(topic), 
standardProps, Optional.empty());
+   testSequence.addSink(producer).setParallelism(3);
+   env.execute("Produce some data");
+
+   // Now let's consume data and check that headers deserialized 
correctly
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   FlinkKafkaConsumer011 kafkaSource = new 
FlinkKafkaConsumer011<>(topic, new 
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+   env.addSource(kafkaSource).addSink(new 
TestHeadersElementValid());
+   env.execute("Consume again");
+
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Element consisting of key, value and headers represented as list of 
tuples: key, list of Bytes.
+*/
+   public static class TestHeadersElement extends Tuple3>>> {
+
+   }
+
+   /**
+* Generate "headers" for given element.
+* @param element - sequence element
+* @return headers
+*/
+   private static Iterable> headersFor(Long 
element) {
+   final long x = element;
+   return Arrays.asList(
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 8) & 0xFF),
+   (byte) ((x) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 24) & 0xFF),
+   (byte) ((x >>> 16) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 40) & 0xFF),
+   (byte) ((x >>> 32) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 56) & 0xFF),
+   (byte) ((x >>> 48) & 0xFF)
+   })
+   );
+   }
+
+   /**
+* Convert headers into list of tuples representation. List of tuples 
is more convenient to 

[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-08-27 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213060361
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 ##
 @@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
}
}
 
+   /**
+* Kafka 0.11 specific test, ensuring Kafka Headers are properly 
written to and read from Kafka.
+*/
+   @Test(timeout = 6)
+   public void testHeaders() throws Exception {
+   final String topic = "headers-topic";
+   final long testSequenceLength = 127L;
+   createTestTopic(topic, 3, 1);
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   DataStream testSequence = env.addSource(new 
SourceFunction() {
+   private static final long serialVersionUID = 1L;
+   boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws 
Exception {
+   long i = 0;
+   while (running) {
+   ctx.collectWithTimestamp(i, i * 2);
+   if (i++ == testSequenceLength) {
+   running = false;
+   }
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   FlinkKafkaProducer011 producer = new 
FlinkKafkaProducer011<>(topic,
+   new TestHeadersKeyedSerializationSchema(topic), 
standardProps, Optional.empty());
+   testSequence.addSink(producer).setParallelism(3);
+   env.execute("Produce some data");
+
+   // Now let's consume data and check that headers deserialized 
correctly
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   FlinkKafkaConsumer011 kafkaSource = new 
FlinkKafkaConsumer011<>(topic, new 
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+   env.addSource(kafkaSource).addSink(new 
TestHeadersElementValid());
+   env.execute("Consume again");
+
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Element consisting of key, value and headers represented as list of 
tuples: key, list of Bytes.
+*/
+   public static class TestHeadersElement extends Tuple3>>> {
+
+   }
+
+   /**
+* Generate "headers" for given element.
+* @param element - sequence element
+* @return headers
+*/
+   private static Iterable> headersFor(Long 
element) {
+   final long x = element;
+   return Arrays.asList(
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 8) & 0xFF),
+   (byte) ((x) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 24) & 0xFF),
+   (byte) ((x >>> 16) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 40) & 0xFF),
+   (byte) ((x >>> 32) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 56) & 0xFF),
+   (byte) ((x >>> 48) & 0xFF)
+   })
+   );
+   }
+
+   /**
+* Convert headers into list of tuples representation. List of tuples 
is more convenient to use in
+* assert expressions, because they have equals
+* @param headers - headers
+* @return list of tuples(string, list of Bytes)
+*/
+   private static List>> 
headersAsList(Iterable> headers) {
+ 

[jira] [Created] (FLINK-10228) Add metrics for netty direct memory consumption

2018-08-27 Thread Ted Yu (JIRA)
Ted Yu created FLINK-10228:
--

 Summary: Add metrics for netty direct memory consumption
 Key: FLINK-10228
 URL: https://issues.apache.org/jira/browse/FLINK-10228
 Project: Flink
  Issue Type: Improvement
Reporter: Ted Yu


netty direct memory usage can be exposed via metrics so that operator can keep 
track of memory consumption.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix

2018-08-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593914#comment-16593914
 ] 

vinoyang commented on FLINK-10222:
--

[~fhueske] agree, The exact match I call the regular expression is actually 
from the keyword2Parser method:
{code:java}
implicit def keyword2Parser(kw: Keyword): Parser[String] = {
  ("""(?i)\Q""" + kw.key + """\E""").r
}
{code}
This method will convert the keyword into a Parser object. If I change it, then 
we will make an exact match for all the keywords. I feel that there should be 
no problem with this. What do you think?

 

> Table scalar function expression parses error when function name equals the 
> exists keyword suffix
> -
>
> Key: FLINK-10222
> URL: https://issues.apache.org/jira/browse/FLINK-10222
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: vinoyang
>Priority: Major
>
> Suffix extraction in ExpressionParser.scala does not actually support 
> extraction of keywords with the same prefix. For example: Adding suffix 
> parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes 
> exceptions. 
> some discussion : 
> [https://github.com/apache/flink/pull/6432#issuecomment-416127815]
> https://github.com/apache/flink/pull/6585#discussion_r212797015



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593904#comment-16593904
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034555
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 ##
 @@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
}
}
 
+   /**
+* Kafka 0.11 specific test, ensuring Kafka Headers are properly 
written to and read from Kafka.
+*/
+   @Test(timeout = 6)
+   public void testHeaders() throws Exception {
+   final String topic = "headers-topic";
+   final long testSequenceLength = 127L;
+   createTestTopic(topic, 3, 1);
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   DataStream testSequence = env.addSource(new 
SourceFunction() {
+   private static final long serialVersionUID = 1L;
+   boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws 
Exception {
+   long i = 0;
+   while (running) {
+   ctx.collectWithTimestamp(i, i * 2);
+   if (i++ == testSequenceLength) {
+   running = false;
+   }
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   FlinkKafkaProducer011 producer = new 
FlinkKafkaProducer011<>(topic,
+   new TestHeadersKeyedSerializationSchema(topic), 
standardProps, Optional.empty());
+   testSequence.addSink(producer).setParallelism(3);
+   env.execute("Produce some data");
+
+   // Now let's consume data and check that headers deserialized 
correctly
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   FlinkKafkaConsumer011 kafkaSource = new 
FlinkKafkaConsumer011<>(topic, new 
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+   env.addSource(kafkaSource).addSink(new 
TestHeadersElementValid());
+   env.execute("Consume again");
+
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Element consisting of key, value and headers represented as list of 
tuples: key, list of Bytes.
+*/
+   public static class TestHeadersElement extends Tuple3>>> {
+
+   }
+
+   /**
+* Generate "headers" for given element.
+* @param element - sequence element
+* @return headers
+*/
+   private static Iterable> headersFor(Long 
element) {
+   final long x = element;
+   return Arrays.asList(
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 8) & 0xFF),
+   (byte) ((x) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 24) & 0xFF),
+   (byte) ((x >>> 16) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 40) & 0xFF),
+   (byte) ((x >>> 32) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 56) & 0xFF),
+   (byte) ((x >>> 48) & 0xFF)
+   })
+   );
+   }
+
+   /**
+* Convert headers into list of tuples representation. List of tuples 
is more convenient to 

[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593905#comment-16593905
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034632
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -54,4 +55,20 @@
 * @return True, if the element signals end of stream, false otherwise.
 */
boolean isEndOfStream(T nextElement);
+
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
 
 Review comment:
   missed topic parameter description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593903#comment-16593903
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034601
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ##
 @@ -206,6 +208,9 @@ protected KafkaConsumerCallBridge createCallBridge() {
return new KafkaConsumerCallBridge();
}
 
+   protected Iterable> 
headersOf(ConsumerRecord record) {
+   return Collections.emptyList();
+   }
 
 Review comment:
   insert a new line after this looks better


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-08-27 Thread GitBox
yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034601
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ##
 @@ -206,6 +208,9 @@ protected KafkaConsumerCallBridge createCallBridge() {
return new KafkaConsumerCallBridge();
}
 
+   protected Iterable> 
headersOf(ConsumerRecord record) {
+   return Collections.emptyList();
+   }
 
 Review comment:
   insert a new line after this looks better


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-08-27 Thread GitBox
yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034632
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -54,4 +55,20 @@
 * @return True, if the element signals end of stream, false otherwise.
 */
boolean isEndOfStream(T nextElement);
+
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
 
 Review comment:
   missed topic parameter description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-08-27 Thread GitBox
yanghua commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213034555
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 ##
 @@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
}
}
 
+   /**
+* Kafka 0.11 specific test, ensuring Kafka Headers are properly 
written to and read from Kafka.
+*/
+   @Test(timeout = 6)
+   public void testHeaders() throws Exception {
+   final String topic = "headers-topic";
+   final long testSequenceLength = 127L;
+   createTestTopic(topic, 3, 1);
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   DataStream testSequence = env.addSource(new 
SourceFunction() {
+   private static final long serialVersionUID = 1L;
+   boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws 
Exception {
+   long i = 0;
+   while (running) {
+   ctx.collectWithTimestamp(i, i * 2);
+   if (i++ == testSequenceLength) {
+   running = false;
+   }
+   }
+   }
+
+   @Override
+   public void cancel() {
+   running = false;
+   }
+   });
+
+   FlinkKafkaProducer011 producer = new 
FlinkKafkaProducer011<>(topic,
+   new TestHeadersKeyedSerializationSchema(topic), 
standardProps, Optional.empty());
+   testSequence.addSink(producer).setParallelism(3);
+   env.execute("Produce some data");
+
+   // Now let's consume data and check that headers deserialized 
correctly
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+   
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+   env.getConfig().disableSysoutLogging();
+   
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   FlinkKafkaConsumer011 kafkaSource = new 
FlinkKafkaConsumer011<>(topic, new 
TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+   env.addSource(kafkaSource).addSink(new 
TestHeadersElementValid());
+   env.execute("Consume again");
+
+   deleteTestTopic(topic);
+   }
+
+   /**
+* Element consisting of key, value and headers represented as list of 
tuples: key, list of Bytes.
+*/
+   public static class TestHeadersElement extends Tuple3>>> {
+
+   }
+
+   /**
+* Generate "headers" for given element.
+* @param element - sequence element
+* @return headers
+*/
+   private static Iterable> headersFor(Long 
element) {
+   final long x = element;
+   return Arrays.asList(
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 8) & 0xFF),
+   (byte) ((x) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("low", new 
byte[]{
+   (byte) ((x >>> 24) & 0xFF),
+   (byte) ((x >>> 16) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 40) & 0xFF),
+   (byte) ((x >>> 32) & 0xFF)
+   }),
+   new AbstractMap.SimpleImmutableEntry<>("high", new 
byte[]{
+   (byte) ((x >>> 56) & 0xFF),
+   (byte) ((x >>> 48) & 0xFF)
+   })
+   );
+   }
+
+   /**
+* Convert headers into list of tuples representation. List of tuples 
is more convenient to use in
+* assert expressions, because they have equals
+* @param headers - headers
+* @return list of tuples(string, list of Bytes)
+*/
+   private static List>> 
headersAsList(Iterable> headers) {
+

[jira] [Commented] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix

2018-08-27 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593899#comment-16593899
 ] 

Fabian Hueske commented on FLINK-10222:
---

I think we should try to use the built-in tooling of the parser framework, such 
as `Keyword` if possible.

> Table scalar function expression parses error when function name equals the 
> exists keyword suffix
> -
>
> Key: FLINK-10222
> URL: https://issues.apache.org/jira/browse/FLINK-10222
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: vinoyang
>Priority: Major
>
> Suffix extraction in ExpressionParser.scala does not actually support 
> extraction of keywords with the same prefix. For example: Adding suffix 
> parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes 
> exceptions. 
> some discussion : 
> [https://github.com/apache/flink/pull/6432#issuecomment-416127815]
> https://github.com/apache/flink/pull/6585#discussion_r212797015



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10207) Bump checkstyle-plugin to 8.9

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593891#comment-16593891
 ] 

ASF GitHub Bot commented on FLINK-10207:


yanghua commented on issue #6618: [FLINK-10207][build] Bump checkstyle to 8.9
URL: https://github.com/apache/flink/pull/6618#issuecomment-416280812
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bump checkstyle-plugin to 8.9
> -
>
> Key: FLINK-10207
> URL: https://issues.apache.org/jira/browse/FLINK-10207
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Our current checkstyle version (8.4) is incompatible with java 9, the 
> earliest version to work properly is 8.9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6618: [FLINK-10207][build] Bump checkstyle to 8.9

2018-08-27 Thread GitBox
yanghua commented on issue #6618: [FLINK-10207][build] Bump checkstyle to 8.9
URL: https://github.com/apache/flink/pull/6618#issuecomment-416280812
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6619: [hotfix][flink-streaming-java] modify typo in the OperatorChain.java

2018-08-27 Thread GitBox
yanghua commented on issue #6619: [hotfix][flink-streaming-java] modify typo in 
the OperatorChain.java
URL: https://github.com/apache/flink/pull/6619#issuecomment-416280110
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6620: [hotfix][flink-streaming-java] modify CreateStreamOutput() in OperatorChain.java

2018-08-27 Thread GitBox
yanghua commented on issue #6620: [hotfix][flink-streaming-java] modify 
CreateStreamOutput() in OperatorChain.java
URL: https://github.com/apache/flink/pull/6620#issuecomment-416279789
 
 
   hi @maqingxiang thanks for your contribution. But I personally feel that it 
is not meaningful to simply modify it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix

2018-08-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593881#comment-16593881
 ] 

vinoyang commented on FLINK-10222:
--

[~walterddr] you are right,  I tried a lot of test cases built on keywords in 
the suffix list. I have always wondered why some keywords do not appear, such 
as the *log* keyword, *log2*, *log10*, *logabc*, can be resolved normally, and 
later found that it requires parameters. Well, if [~fhueske] have no 
objections, I can fix this PR, in the form of a regular exact match.

> Table scalar function expression parses error when function name equals the 
> exists keyword suffix
> -
>
> Key: FLINK-10222
> URL: https://issues.apache.org/jira/browse/FLINK-10222
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: vinoyang
>Priority: Major
>
> Suffix extraction in ExpressionParser.scala does not actually support 
> extraction of keywords with the same prefix. For example: Adding suffix 
> parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes 
> exceptions. 
> some discussion : 
> [https://github.com/apache/flink/pull/6432#issuecomment-416127815]
> https://github.com/apache/flink/pull/6585#discussion_r212797015



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL

2018-08-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593867#comment-16593867
 ] 

vinoyang commented on FLINK-10216:
--

If regexp_match (or other name) maintains the same semantics as similar to, 
then I support providing a function that provides the same implementation as 
similar to, because it seems similar to satisfy the semantics of the database, 
but purely based Java's regular matching can't be done.

> Add REGEXP_MATCH in TableAPI and SQL
> 
>
> Key: FLINK-10216
> URL: https://issues.apache.org/jira/browse/FLINK-10216
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Juho Autio
>Priority: Major
>
> Here's a naive implementation:
> {code:java}
> public class RegexpMatchFunction extends ScalarFunction {
> // NOTE! Flink calls eval() by reflection
> public boolean eval(String value, String pattern) {
> return value != null && pattern != null && value.matches(pattern);
> }
> }
> {code}
> I wonder if there would be a way to optimize this to use 
> {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls 
> (possibly different values, but same pattern).
> h3. Naming
> Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: 
> [https://github.com/apache/flink/pull/6448#issuecomment-415972833]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix

2018-08-27 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593861#comment-16593861
 ] 

Rong Rong commented on FLINK-10222:
---

I think making it exactly match is probably the best solution here. I briefly 
considered making the {{opt("()")}} mandatory but that breaks backward 
compatibility. Most of the reasons why this was not surface previously was that 
may of the short keywords requires arguments, such as {{as}}.  

> Table scalar function expression parses error when function name equals the 
> exists keyword suffix
> -
>
> Key: FLINK-10222
> URL: https://issues.apache.org/jira/browse/FLINK-10222
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: vinoyang
>Priority: Major
>
> Suffix extraction in ExpressionParser.scala does not actually support 
> extraction of keywords with the same prefix. For example: Adding suffix 
> parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes 
> exceptions. 
> some discussion : 
> [https://github.com/apache/flink/pull/6432#issuecomment-416127815]
> https://github.com/apache/flink/pull/6585#discussion_r212797015



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix

2018-08-27 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593852#comment-16593852
 ] 

vinoyang commented on FLINK-10222:
--

So, what do you think exactly matches a keyword as a suffix? Because the suffix 
keyword is deterministic and semantically clear. Or is there any other better 
solution? [~walterddr] [~fhueske]

> Table scalar function expression parses error when function name equals the 
> exists keyword suffix
> -
>
> Key: FLINK-10222
> URL: https://issues.apache.org/jira/browse/FLINK-10222
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: vinoyang
>Priority: Major
>
> Suffix extraction in ExpressionParser.scala does not actually support 
> extraction of keywords with the same prefix. For example: Adding suffix 
> parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes 
> exceptions. 
> some discussion : 
> [https://github.com/apache/flink/pull/6432#issuecomment-416127815]
> https://github.com/apache/flink/pull/6585#discussion_r212797015



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593843#comment-16593843
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213021336
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 ##
 @@ -112,30 +129,43 @@ else if (!isRetrieving()) {
@Override
protected void processRecord(Tuple2 change) {
synchronized (resultLock) {
-   final Row row = change.f1;
// insert
if (change.f0) {
-   materializedTable.add(row);
-   rowPositionCache.put(row, 
materializedTable.size() - 1);
+   processInsert(change.f1);
}
// delete
else {
-   // delete the newest record first to minimize 
per-page changes
-   final Integer cachedPos = 
rowPositionCache.get(row);
-   final int startSearchPos;
-   if (cachedPos != null) {
-   startSearchPos = Math.min(cachedPos, 
materializedTable.size() - 1);
-   } else {
-   startSearchPos = 
materializedTable.size() - 1;
-   }
-
-   for (int i = startSearchPos; i >= 0; i--) {
-   if 
(materializedTable.get(i).equals(row)) {
-   materializedTable.remove(i);
-   rowPositionCache.remove(row);
-   break;
-   }
-   }
+   processDelete(change.f1);
+   }
+   }
+   }
+
+   // 

+
+   private void processInsert(Row row) {
+   // limit the materialized table
+   if (materializedTable.size() >= maxRowCount) {
+   processDelete(materializedTable.get(0));
 
 Review comment:
   Or we just set elements to `null` and have an `startOffset` counter that we 
increase.
   Once the counter equals `1000` (magic number!), we shrink the `ArrayList`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - -Add more tests for executor-
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - -"The input is invalid please check it again." => add allowed range-
>  - Load dependencies recursively
>  - Clean up results in result store
>  - -Improve error message for unsupported batch queries-
>  - -Add more logging instead swallowing exceptions-
>  - -List properties in error message about missing TS factory sorted by name-
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - -Add switch to show full stacktraces of exceptions- solved by logging
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6621: [FLINK-8686] [sql-client] Limit result size for prototyping modes

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213021336
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 ##
 @@ -112,30 +129,43 @@ else if (!isRetrieving()) {
@Override
protected void processRecord(Tuple2 change) {
synchronized (resultLock) {
-   final Row row = change.f1;
// insert
if (change.f0) {
-   materializedTable.add(row);
-   rowPositionCache.put(row, 
materializedTable.size() - 1);
+   processInsert(change.f1);
}
// delete
else {
-   // delete the newest record first to minimize 
per-page changes
-   final Integer cachedPos = 
rowPositionCache.get(row);
-   final int startSearchPos;
-   if (cachedPos != null) {
-   startSearchPos = Math.min(cachedPos, 
materializedTable.size() - 1);
-   } else {
-   startSearchPos = 
materializedTable.size() - 1;
-   }
-
-   for (int i = startSearchPos; i >= 0; i--) {
-   if 
(materializedTable.get(i).equals(row)) {
-   materializedTable.remove(i);
-   rowPositionCache.remove(row);
-   break;
-   }
-   }
+   processDelete(change.f1);
+   }
+   }
+   }
+
+   // 

+
+   private void processInsert(Row row) {
+   // limit the materialized table
+   if (materializedTable.size() >= maxRowCount) {
+   processDelete(materializedTable.get(0));
 
 Review comment:
   Or we just set elements to `null` and have an `startOffset` counter that we 
increase.
   Once the counter equals `1000` (magic number!), we shrink the `ArrayList`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593835#comment-16593835
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213009599
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -200,14 +201,16 @@ functions:
 execution:
   type: streaming   # required: execution mode either 'batch' 
or 'streaming'
   result-mode: table# required: either 'table' or 'changelog'
+  max-table-result-rows: 100# optional: maximum number of maintained 
rows in
 
 Review comment:
   Allow to disable by setting to `-1`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - -Add more tests for executor-
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - -"The input is invalid please check it again." => add allowed range-
>  - Load dependencies recursively
>  - Clean up results in result store
>  - -Improve error message for unsupported batch queries-
>  - -Add more logging instead swallowing exceptions-
>  - -List properties in error message about missing TS factory sorted by name-
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - -Add switch to show full stacktraces of exceptions- solved by logging
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593839#comment-16593839
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213012860
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 ##
 @@ -55,10 +61,21 @@
 
private boolean isLastSnapshot;
 
-   public MaterializedCollectStreamResult(TypeInformation outputType, 
ExecutionConfig config,
-   InetAddress gatewayAddress, int gatewayPort) {
+   public MaterializedCollectStreamResult(
+   TypeInformation outputType,
+   ExecutionConfig config,
+   InetAddress gatewayAddress,
+   int gatewayPort,
+   long maxRowCount) {
super(outputType, config, gatewayAddress, gatewayPort);
 
+   if (maxRowCount < 1) {
 
 Review comment:
   If disabled (i.e., `< 0`), we could set the parameter to `Integer.MAX_VALUE`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - -Add more tests for executor-
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - -"The input is invalid please check it again." => add allowed range-
>  - Load dependencies recursively
>  - Clean up results in result store
>  - -Improve error message for unsupported batch queries-
>  - -Add more logging instead swallowing exceptions-
>  - -List properties in error message about missing TS factory sorted by name-
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - -Add switch to show full stacktraces of exceptions- solved by logging
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593837#comment-16593837
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213014246
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 ##
 @@ -112,30 +129,43 @@ else if (!isRetrieving()) {
@Override
protected void processRecord(Tuple2 change) {
synchronized (resultLock) {
-   final Row row = change.f1;
// insert
if (change.f0) {
-   materializedTable.add(row);
-   rowPositionCache.put(row, 
materializedTable.size() - 1);
+   processInsert(change.f1);
}
// delete
else {
-   // delete the newest record first to minimize 
per-page changes
-   final Integer cachedPos = 
rowPositionCache.get(row);
-   final int startSearchPos;
-   if (cachedPos != null) {
-   startSearchPos = Math.min(cachedPos, 
materializedTable.size() - 1);
-   } else {
-   startSearchPos = 
materializedTable.size() - 1;
-   }
-
-   for (int i = startSearchPos; i >= 0; i--) {
-   if 
(materializedTable.get(i).equals(row)) {
-   materializedTable.remove(i);
-   rowPositionCache.remove(row);
-   break;
-   }
-   }
+   processDelete(change.f1);
+   }
+   }
+   }
+
+   // 

+
+   private void processInsert(Row row) {
+   // limit the materialized table
+   if (materializedTable.size() >= maxRowCount) {
+   processDelete(materializedTable.get(0));
 
 Review comment:
   Why should we search for the row if we know that we want to remove it from 
the first position?
   Couldn't we just do `materializedTable.remove(0)`? This will be expensive 
because `materializedTable` is an `ArrayList` but what's the chance that there 
is another row that is equal to the one that is removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - -Add more tests for executor-
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - -"The input is invalid please check it again." => add allowed range-
>  - Load dependencies recursively
>  - Clean up results in result store
>  - -Improve error message for unsupported batch queries-
>  - -Add more logging instead swallowing exceptions-
>  - -List properties in error message about missing TS factory sorted by name-
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - -Add switch to show full stacktraces of exceptions- solved by logging
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593838#comment-16593838
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213014505
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 ##
 @@ -112,30 +129,43 @@ else if (!isRetrieving()) {
@Override
protected void processRecord(Tuple2 change) {
synchronized (resultLock) {
-   final Row row = change.f1;
// insert
if (change.f0) {
-   materializedTable.add(row);
-   rowPositionCache.put(row, 
materializedTable.size() - 1);
+   processInsert(change.f1);
}
// delete
else {
-   // delete the newest record first to minimize 
per-page changes
-   final Integer cachedPos = 
rowPositionCache.get(row);
-   final int startSearchPos;
-   if (cachedPos != null) {
-   startSearchPos = Math.min(cachedPos, 
materializedTable.size() - 1);
-   } else {
-   startSearchPos = 
materializedTable.size() - 1;
-   }
-
-   for (int i = startSearchPos; i >= 0; i--) {
-   if 
(materializedTable.get(i).equals(row)) {
-   materializedTable.remove(i);
-   rowPositionCache.remove(row);
-   break;
-   }
-   }
+   processDelete(change.f1);
+   }
+   }
+   }
+
+   // 

+
+   private void processInsert(Row row) {
+   // limit the materialized table
+   if (materializedTable.size() >= maxRowCount) {
+   processDelete(materializedTable.get(0));
 
 Review comment:
   Maybe we should use a `LinkedList` instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - -Add more tests for executor-
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - -"The input is invalid please check it again." => add allowed range-
>  - Load dependencies recursively
>  - Clean up results in result store
>  - -Improve error message for unsupported batch queries-
>  - -Add more logging instead swallowing exceptions-
>  - -List properties in error message about missing TS factory sorted by name-
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - -Add switch to show full stacktraces of exceptions- solved by logging
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593834#comment-16593834
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213012121
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
 ##
 @@ -133,6 +134,13 @@ protected void refresh() {
}
 
// update results
+
+   // formatting and printing of rows is 
expensive in the current implementation,
+   // therefore we limit the maximum 
number of lines shown in changelog mode to
+   // keep the CLI responsive
+   if (results.size() >= 
DEFAULT_MAX_ROW_COUNT) {
+   results.remove(0);
 
 Review comment:
   Alternatively, we could delete all entries at once with 
`ArrayList.removeRange()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - -Add more tests for executor-
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - -"The input is invalid please check it again." => add allowed range-
>  - Load dependencies recursively
>  - Clean up results in result store
>  - -Improve error message for unsupported batch queries-
>  - -Add more logging instead swallowing exceptions-
>  - -List properties in error message about missing TS factory sorted by name-
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - -Add switch to show full stacktraces of exceptions- solved by logging
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593836#comment-16593836
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213010987
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
 ##
 @@ -133,6 +134,13 @@ protected void refresh() {
}
 
// update results
+
+   // formatting and printing of rows is 
expensive in the current implementation,
+   // therefore we limit the maximum 
number of lines shown in changelog mode to
+   // keep the CLI responsive
+   if (results.size() >= 
DEFAULT_MAX_ROW_COUNT) {
+   results.remove(0);
 
 Review comment:
   Since `results` is an `ArrayList`, removing the first element means that the 
whole array is shifted by copying. `LinkedList` might be more efficient. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - -Add more tests for executor-
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - -"The input is invalid please check it again." => add allowed range-
>  - Load dependencies recursively
>  - Clean up results in result store
>  - -Improve error message for unsupported batch queries-
>  - -Add more logging instead swallowing exceptions-
>  - -List properties in error message about missing TS factory sorted by name-
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - -Add switch to show full stacktraces of exceptions- solved by logging
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6621: [FLINK-8686] [sql-client] Limit result size for prototyping modes

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213009599
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -200,14 +201,16 @@ functions:
 execution:
   type: streaming   # required: execution mode either 'batch' 
or 'streaming'
   result-mode: table# required: either 'table' or 'changelog'
+  max-table-result-rows: 100# optional: maximum number of maintained 
rows in
 
 Review comment:
   Allow to disable by setting to `-1`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6621: [FLINK-8686] [sql-client] Limit result size for prototyping modes

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213014505
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 ##
 @@ -112,30 +129,43 @@ else if (!isRetrieving()) {
@Override
protected void processRecord(Tuple2 change) {
synchronized (resultLock) {
-   final Row row = change.f1;
// insert
if (change.f0) {
-   materializedTable.add(row);
-   rowPositionCache.put(row, 
materializedTable.size() - 1);
+   processInsert(change.f1);
}
// delete
else {
-   // delete the newest record first to minimize 
per-page changes
-   final Integer cachedPos = 
rowPositionCache.get(row);
-   final int startSearchPos;
-   if (cachedPos != null) {
-   startSearchPos = Math.min(cachedPos, 
materializedTable.size() - 1);
-   } else {
-   startSearchPos = 
materializedTable.size() - 1;
-   }
-
-   for (int i = startSearchPos; i >= 0; i--) {
-   if 
(materializedTable.get(i).equals(row)) {
-   materializedTable.remove(i);
-   rowPositionCache.remove(row);
-   break;
-   }
-   }
+   processDelete(change.f1);
+   }
+   }
+   }
+
+   // 

+
+   private void processInsert(Row row) {
+   // limit the materialized table
+   if (materializedTable.size() >= maxRowCount) {
+   processDelete(materializedTable.get(0));
 
 Review comment:
   Maybe we should use a `LinkedList` instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6621: [FLINK-8686] [sql-client] Limit result size for prototyping modes

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213010987
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
 ##
 @@ -133,6 +134,13 @@ protected void refresh() {
}
 
// update results
+
+   // formatting and printing of rows is 
expensive in the current implementation,
+   // therefore we limit the maximum 
number of lines shown in changelog mode to
+   // keep the CLI responsive
+   if (results.size() >= 
DEFAULT_MAX_ROW_COUNT) {
+   results.remove(0);
 
 Review comment:
   Since `results` is an `ArrayList`, removing the first element means that the 
whole array is shifted by copying. `LinkedList` might be more efficient. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6621: [FLINK-8686] [sql-client] Limit result size for prototyping modes

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213012860
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 ##
 @@ -55,10 +61,21 @@
 
private boolean isLastSnapshot;
 
-   public MaterializedCollectStreamResult(TypeInformation outputType, 
ExecutionConfig config,
-   InetAddress gatewayAddress, int gatewayPort) {
+   public MaterializedCollectStreamResult(
+   TypeInformation outputType,
+   ExecutionConfig config,
+   InetAddress gatewayAddress,
+   int gatewayPort,
+   long maxRowCount) {
super(outputType, config, gatewayAddress, gatewayPort);
 
+   if (maxRowCount < 1) {
 
 Review comment:
   If disabled (i.e., `< 0`), we could set the parameter to `Integer.MAX_VALUE`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6621: [FLINK-8686] [sql-client] Limit result size for prototyping modes

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213014246
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 ##
 @@ -112,30 +129,43 @@ else if (!isRetrieving()) {
@Override
protected void processRecord(Tuple2 change) {
synchronized (resultLock) {
-   final Row row = change.f1;
// insert
if (change.f0) {
-   materializedTable.add(row);
-   rowPositionCache.put(row, 
materializedTable.size() - 1);
+   processInsert(change.f1);
}
// delete
else {
-   // delete the newest record first to minimize 
per-page changes
-   final Integer cachedPos = 
rowPositionCache.get(row);
-   final int startSearchPos;
-   if (cachedPos != null) {
-   startSearchPos = Math.min(cachedPos, 
materializedTable.size() - 1);
-   } else {
-   startSearchPos = 
materializedTable.size() - 1;
-   }
-
-   for (int i = startSearchPos; i >= 0; i--) {
-   if 
(materializedTable.get(i).equals(row)) {
-   materializedTable.remove(i);
-   rowPositionCache.remove(row);
-   break;
-   }
-   }
+   processDelete(change.f1);
+   }
+   }
+   }
+
+   // 

+
+   private void processInsert(Row row) {
+   // limit the materialized table
+   if (materializedTable.size() >= maxRowCount) {
+   processDelete(materializedTable.get(0));
 
 Review comment:
   Why should we search for the row if we know that we want to remove it from 
the first position?
   Couldn't we just do `materializedTable.remove(0)`? This will be expensive 
because `materializedTable` is an `ArrayList` but what's the chance that there 
is another row that is equal to the one that is removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6621: [FLINK-8686] [sql-client] Limit result size for prototyping modes

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6621:  [FLINK-8686] [sql-client] 
Limit result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621#discussion_r213012121
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
 ##
 @@ -133,6 +134,13 @@ protected void refresh() {
}
 
// update results
+
+   // formatting and printing of rows is 
expensive in the current implementation,
+   // therefore we limit the maximum 
number of lines shown in changelog mode to
+   // keep the CLI responsive
+   if (results.size() >= 
DEFAULT_MAX_ROW_COUNT) {
+   results.remove(0);
 
 Review comment:
   Alternatively, we could delete all entries at once with 
`ArrayList.removeRange()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix

2018-08-27 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593824#comment-16593824
 ] 

Rong Rong commented on FLINK-10222:
---

Agree with [~fhueske]'s observation. This will cause many collisions especially 
with many of Table API's keyword being short as 3-4 characters. 

> Table scalar function expression parses error when function name equals the 
> exists keyword suffix
> -
>
> Key: FLINK-10222
> URL: https://issues.apache.org/jira/browse/FLINK-10222
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: vinoyang
>Priority: Major
>
> Suffix extraction in ExpressionParser.scala does not actually support 
> extraction of keywords with the same prefix. For example: Adding suffix 
> parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes 
> exceptions. 
> some discussion : 
> [https://github.com/apache/flink/pull/6432#issuecomment-416127815]
> https://github.com/apache/flink/pull/6585#discussion_r212797015



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix

2018-08-27 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-10222:
--
Description: 
Suffix extraction in ExpressionParser.scala does not actually support 
extraction of keywords with the same prefix. For example: Adding suffix parsing 
rules for {{a.fun}} and {{a.function}} simultaneously will causes exceptions. 

some discussion : 

[https://github.com/apache/flink/pull/6432#issuecomment-416127815]

https://github.com/apache/flink/pull/6585#discussion_r212797015

  was:
Suffix extraction in ExpressionParser.scala does not actually support 
extraction of keywords with the same prefix. For example: Adding suffix parsing 
rules for {{a.fun}} and {{a.function}} simultaneously will causes exceptions. 

Suggestions is to make suffix extraction to be exact match.

some discussion : 

[https://github.com/apache/flink/pull/6432#issuecomment-416127815]

https://github.com/apache/flink/pull/6585#discussion_r212797015


> Table scalar function expression parses error when function name equals the 
> exists keyword suffix
> -
>
> Key: FLINK-10222
> URL: https://issues.apache.org/jira/browse/FLINK-10222
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: vinoyang
>Priority: Major
>
> Suffix extraction in ExpressionParser.scala does not actually support 
> extraction of keywords with the same prefix. For example: Adding suffix 
> parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes 
> exceptions. 
> some discussion : 
> [https://github.com/apache/flink/pull/6432#issuecomment-416127815]
> https://github.com/apache/flink/pull/6585#discussion_r212797015



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10222) Table scalar function expression parses error when function name equals the exists keyword suffix

2018-08-27 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-10222:
--
Description: 
Suffix extraction in ExpressionParser.scala does not actually support 
extraction of keywords with the same prefix. For example: Adding suffix parsing 
rules for {{a.fun}} and {{a.function}} simultaneously will causes exceptions. 

Suggestions is to make suffix extraction to be exact match.

some discussion : 

[https://github.com/apache/flink/pull/6432#issuecomment-416127815]

https://github.com/apache/flink/pull/6585#discussion_r212797015

  was:
some discussion : 

[https://github.com/apache/flink/pull/6432#issuecomment-416127815]

https://github.com/apache/flink/pull/6585#discussion_r212797015


> Table scalar function expression parses error when function name equals the 
> exists keyword suffix
> -
>
> Key: FLINK-10222
> URL: https://issues.apache.org/jira/browse/FLINK-10222
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: vinoyang
>Priority: Major
>
> Suffix extraction in ExpressionParser.scala does not actually support 
> extraction of keywords with the same prefix. For example: Adding suffix 
> parsing rules for {{a.fun}} and {{a.function}} simultaneously will causes 
> exceptions. 
> Suggestions is to make suffix extraction to be exact match.
> some discussion : 
> [https://github.com/apache/flink/pull/6432#issuecomment-416127815]
> https://github.com/apache/flink/pull/6585#discussion_r212797015



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7594) Add a SQL client

2018-08-27 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593802#comment-16593802
 ] 

Timo Walther commented on FLINK-7594:
-

We have made a lot of progress in the last months. FLIP-24 part 1 is done and 
half of part 2 as well. The SQL Client is already a very powerful tool. I think 
it is time to not add more subtasks to this issue. I created a new "SQL Client" 
component in JIRA. I will port the remaining subtasks of this issue and mark 
this as resolved, if there are no objections?

> Add a SQL client
> 
>
> Key: FLINK-7594
> URL: https://issues.apache.org/jira/browse/FLINK-7594
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment a user can only specify queries within a Java/Scala program 
> which is nice for integrating table programs or parts of it with DataSet or 
> DataStream API. With more connectors coming up, it is time to also provide a 
> programming-free SQL client. The SQL client should consist of a CLI interface 
> and maybe also a REST API. The concrete design is still up for discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr opened a new pull request #6621: [FLINK-8686] [sql-client] Limit result size for prototyping modes

2018-08-27 Thread GitBox
twalthr opened a new pull request #6621:  [FLINK-8686] [sql-client] Limit 
result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621
 
 
   ## What is the purpose of the change
   
   This PR aims to make the SQL Client more robust by limiting the result size 
for both changelog and table result modes. In changelog mode the responsiveness 
of the CLI is the limiting factor. In table mode only the main memory is the 
limiting factor and the configurable maximum row count.
   
   This PR is built on top of FLINK-10192.
   
   ## Brief change log
   
   - Add a hard limit for changelog mode (1000 rows)
   - Add a configurable limit for table mode
   
   ## Verifying this change
   
   - Added test `LocalExecutorITCase#testStreamQueryExecutionLimitedTable`
   - Added test `MaterializedCollectStreamResultTest#testLimitedSnapshot`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8686) Improve basic embedded SQL client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593793#comment-16593793
 ] 

ASF GitHub Bot commented on FLINK-8686:
---

twalthr opened a new pull request #6621:  [FLINK-8686] [sql-client] Limit 
result size for prototyping modes
URL: https://github.com/apache/flink/pull/6621
 
 
   ## What is the purpose of the change
   
   This PR aims to make the SQL Client more robust by limiting the result size 
for both changelog and table result modes. In changelog mode the responsiveness 
of the CLI is the limiting factor. In table mode only the main memory is the 
limiting factor and the configurable maximum row count.
   
   This PR is built on top of FLINK-10192.
   
   ## Brief change log
   
   - Add a hard limit for changelog mode (1000 rows)
   - Add a configurable limit for table mode
   
   ## Verifying this change
   
   - Added test `LocalExecutorITCase#testStreamQueryExecutionLimitedTable`
   - Added test `MaterializedCollectStreamResultTest#testLimitedSnapshot`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - -Add more tests for executor-
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - -"The input is invalid please check it again." => add allowed range-
>  - Load dependencies recursively
>  - Clean up results in result store
>  - -Improve error message for unsupported batch queries-
>  - -Add more logging instead swallowing exceptions-
>  - -List properties in error message about missing TS factory sorted by name-
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - -Add switch to show full stacktraces of exceptions- solved by logging
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8686) Improve basic embedded SQL client

2018-08-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8686:
--
Labels: pull-request-available  (was: )

> Improve basic embedded SQL client 
> --
>
> Key: FLINK-8686
> URL: https://issues.apache.org/jira/browse/FLINK-8686
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> This issue describes follow-up issues that should be fixes in order to make 
> the SQL client more stable:
>  - -Add more tests for executor-
>  - Configure JVM heap size
>  - Limit changelog and table buffers
>  - -"The input is invalid please check it again." => add allowed range-
>  - Load dependencies recursively
>  - Clean up results in result store
>  - -Improve error message for unsupported batch queries-
>  - -Add more logging instead swallowing exceptions-
>  - -List properties in error message about missing TS factory sorted by name-
>  - Add command to show loaded TS factories and their required propeties
>  - Add command to reload configuration from files (no need to restart client)
>  - Improve error message in case of invalid json-schema (right now: 
> {{java.lang.IllegalArgumentException: No type could be found in node:}}
>  - -Add switch to show full stacktraces of exceptions- solved by logging
>  - Give error message when setting unknown parameters 
> {{result-mode=changelog}} does not give an error but should be 
> {{execution.result-mode=changelog}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593768#comment-16593768
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r213001675
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   OK, let's remove it for now and figure that out later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r213001675
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   OK, let's remove it for now and figure that out later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL

2018-08-27 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593726#comment-16593726
 ] 

Hequn Cheng commented on FLINK-10216:
-

I agree with [~twalthr]. Besides, I think it would be good to follow hive or 
oracle or other standard databases.
Hive:
REGEXP/REGEXP_EXTRACT/REGEXP_REPLACE
Oracle:
REGEXP_LIKE/REGEXP_REPLACE/REGEXP_INSTR/REGEXP_SUBSTR
So, maybe we should not use REGEXP_MATCH. Use REGEXP or REGEXP_LIKE?

> Add REGEXP_MATCH in TableAPI and SQL
> 
>
> Key: FLINK-10216
> URL: https://issues.apache.org/jira/browse/FLINK-10216
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Juho Autio
>Priority: Major
>
> Here's a naive implementation:
> {code:java}
> public class RegexpMatchFunction extends ScalarFunction {
> // NOTE! Flink calls eval() by reflection
> public boolean eval(String value, String pattern) {
> return value != null && pattern != null && value.matches(pattern);
> }
> }
> {code}
> I wonder if there would be a way to optimize this to use 
> {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls 
> (possibly different values, but same pattern).
> h3. Naming
> Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: 
> [https://github.com/apache/flink/pull/6448#issuecomment-415972833]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9913:
---
Description: 
Currently the {{RecordWriter}} emits output into multi channels via 
{{ChannelSelector}}  or broadcasts output to all channels directly. Each 
channel has a separate {{RecordSerializer}} for serializing outputs, that means 
the output will be serialized as many times as the number of selected channels.

As we know, data serialization is a high cost operation, so we can get good 
benefits by improving the serialization only once.

I would suggest the following changes for realizing it.
 # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
channels.
 # The output is serialized into the intermediate data buffer only once for 
different channels.
 # The intermediate serialization results are copied into different 
{{BufferBuilder}}s for different channels.

An additional benefit by using a single serializer for all channels is that we 
get a potentially significant reduction on heap space overhead from fewer 
intermediate serialization buffers (only once we got over 5MiB, these buffers 
were pruned back to 128B!).

  was:
Currently the {{RecordWriter}} emits output into multi channels via 
{{ChannelSelector}}  or broadcasts output to all channels directly. Each 
channel has a separate {{RecordSerializer}} for serializing outputs, that means 
the output will be serialized as many times as the number of selected channels.

As we know, data serialization is a high cost operation, so we can get good 
benefits by improving the serialization only once.

I would suggest the following changes for realizing it.
 # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
channels.
 # The output is serialized into the intermediate data buffer only once for 
different channels.
 # The intermediate serialization results are copied into different 
{{BufferBuilder}}s for different channels.


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9913:
---
Priority: Major  (was: Minor)

> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593709#comment-16593709
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212979054
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ##
 @@ -420,17 +420,17 @@ private void callCreateView(SqlCommandCall cmdCall) {
final String name = cmdCall.operands[0];
final String query = cmdCall.operands[1];
 
+   final String previousQuery = context.getViews().get(name);
 
 Review comment:
   I think we should not allow to override a view.
   It should be explicitly dropped first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593710#comment-16593710
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK edited a comment on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-416236083
 
 
   I don't quite get the argument with the `pruneBuffer()` call having this 
much overhead...at least for small records since its implementation only prunes 
if the buffer is larger than 5MiB:
   ```
public void pruneBuffer() {
if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
   ...
   ```
   
   well, actually, this brings it down to the overhead from 
`org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer#prune`:
   ```
@Override
public void prune() {
serializationBuffer.pruneBuffer();
dataBuffer = serializationBuffer.wrapAsByteBuffer();
}
   ```
   but to be honest, `serializationBuffer.wrapAsByteBuffer();` also just sets 
two position numbers
   -> if we are that sensitive to small changes, we indeed should think about 
optimising this one call to `tryFinishCurrentBufferBuilder()` which I mentioned 
above(?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212979054
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ##
 @@ -420,17 +420,17 @@ private void callCreateView(SqlCommandCall cmdCall) {
final String name = cmdCall.operands[0];
final String query = cmdCall.operands[1];
 
+   final String previousQuery = context.getViews().get(name);
 
 Review comment:
   I think we should not allow to override a view.
   It should be explicitly dropped first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK edited a comment on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK edited a comment on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-416236083
 
 
   I don't quite get the argument with the `pruneBuffer()` call having this 
much overhead...at least for small records since its implementation only prunes 
if the buffer is larger than 5MiB:
   ```
public void pruneBuffer() {
if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
   ...
   ```
   
   well, actually, this brings it down to the overhead from 
`org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer#prune`:
   ```
@Override
public void prune() {
serializationBuffer.pruneBuffer();
dataBuffer = serializationBuffer.wrapAsByteBuffer();
}
   ```
   but to be honest, `serializationBuffer.wrapAsByteBuffer();` also just sets 
two position numbers
   -> if we are that sensitive to small changes, we indeed should think about 
optimising this one call to `tryFinishCurrentBufferBuilder()` which I mentioned 
above(?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593705#comment-16593705
 ] 

ASF GitHub Bot commented on FLINK-10163:


twalthr commented on issue #6606: [FLINK-10163] [sql-client] Support views in 
SQL Client
URL: https://github.com/apache/flink/pull/6606#issuecomment-416236908
 
 
   @fhueske I can only see one more comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-27 Thread GitBox
twalthr commented on issue #6606: [FLINK-10163] [sql-client] Support views in 
SQL Client
URL: https://github.com/apache/flink/pull/6606#issuecomment-416236908
 
 
   @fhueske I can only see one more comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593701#comment-16593701
 ] 

ASF GitHub Bot commented on FLINK-10163:


twalthr commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212985602
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   FYI: In MySQL it is called [SHOW CREATE 
VIEW](https://dev.mysql.com/doc/refman/8.0/en/show-create-view.html) and many 
vendors don't even offer this functionality which I find weird (e.g. Snowflake 
only shows the schema for [DESCRIBE 
VIEW](https://docs.snowflake.net/manuals/sql-reference/sql/desc-view.html)).
   
   I'm fine with removing `SHOW VIEW` for now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-27 Thread GitBox
twalthr commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212985602
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   FYI: In MySQL it is called [SHOW CREATE 
VIEW](https://dev.mysql.com/doc/refman/8.0/en/show-create-view.html) and many 
vendors don't even offer this functionality which I find weird (e.g. Snowflake 
only shows the schema for [DESCRIBE 
VIEW](https://docs.snowflake.net/manuals/sql-reference/sql/desc-view.html)).
   
   I'm fine with removing `SHOW VIEW` for now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593696#comment-16593696
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-416236083
 
 
   I don't quite get the argument with the `pruneBuffer()` call having this 
much overhead...at least for small records since its implementation only prunes 
if the buffer is larger than 5MiB:
   ```
public void pruneBuffer() {
if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
   ...
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593694#comment-16593694
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on issue #6606: [FLINK-10163] [sql-client] Support views in 
SQL Client
URL: https://github.com/apache/flink/pull/6606#issuecomment-416235894
 
 
   Thanks for the update @twalthr. 
   I left two more comments.
   
   Thanks, Fabian


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-416236083
 
 
   I don't quite get the argument with the `pruneBuffer()` call having this 
much overhead...at least for small records since its implementation only prunes 
if the buffer is larger than 5MiB:
   ```
public void pruneBuffer() {
if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
   ...
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-27 Thread GitBox
fhueske commented on issue #6606: [FLINK-10163] [sql-client] Support views in 
SQL Client
URL: https://github.com/apache/flink/pull/6606#issuecomment-416235894
 
 
   Thanks for the update @twalthr. 
   I left two more comments.
   
   Thanks, Fabian


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593688#comment-16593688
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212970682
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -106,17 +102,19 @@ public boolean equals(Object obj) {
}
};
 
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
+   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.copyToBufferBuilder(bufferBuilder1));
 
-   result = serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
+   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.copyToBufferBuilder(bufferBuilder1));
 
-   result = serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
+   serializer.serializeRecord(emptyRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593681#comment-16593681
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212951771
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   actually, here, we do not only know that the buffer builder is present, we 
also already have its reference (in contrast to 
`tryFinishCurrentBufferBuilder()`) and don't need to update the 
`bufferBuilders` field until after the `while` loop - I'm not sure whether this 
is worth optimising, though (@pnowojski?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593686#comment-16593686
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212969997
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -40,25 +41,25 @@
 
@Test
public void testHasSerializedData() throws IOException {
-   final int segmentSize = 16;
-
final SpanningRecordSerializer 
serializer = new SpanningRecordSerializer<>();
final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
Assert.assertTrue(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
+   serializer.copyToBufferBuilder(bufferBuilder1);
Assert.assertFalse(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8));
-
-   serializer.addRecord(randomIntRecord);
+   final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
+   serializer.serializeRecord(randomIntRecord);
+   serializer.copyToBufferBuilder(bufferBuilder2);
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593683#comment-16593683
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212980692
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException {
}
}
 
+   /**
+* Broadcast channel selector that selects all the output channels.
+*/
+   private static class Broadcast implements 
ChannelSelector {
+
+   private int[] returnChannel;
+   boolean set;
 
 Review comment:
   Actually, this is a copy of 
`org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner` which is 
in the `flink-streaming-java` submodule, though.
   And in general it is good to cache this rather than building a new array for 
every record...
   
   Using `returnChannel.length == numberOfOutputChannels` makes sense though - 
@zhijiangW can you also create a hotfix commit changing this in 
`org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593687#comment-16593687
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212970648
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -106,17 +102,19 @@ public boolean equals(Object obj) {
}
};
 
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
+   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.copyToBufferBuilder(bufferBuilder1));
 
-   result = serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593680#comment-16593680
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212974745
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   The `RecordWriter` instance should be different depending on 
`isBroadcastEmit` to really separate these two cases?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593682#comment-16593682
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212969980
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -40,25 +41,25 @@
 
@Test
public void testHasSerializedData() throws IOException {
-   final int segmentSize = 16;
-
final SpanningRecordSerializer 
serializer = new SpanningRecordSerializer<>();
final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
Assert.assertTrue(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
+   serializer.copyToBufferBuilder(bufferBuilder1);
Assert.assertFalse(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8));
-
-   serializer.addRecord(randomIntRecord);
+   final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
+   serializer.serializeRecord(randomIntRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593685#comment-16593685
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212972706
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -169,25 +167,26 @@ private void test(Util.MockRecords records, int 
segmentSize) throws Exception {
 
// 
-
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
-
+   BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
int numBytes = 0;
for (SerializationTestType record : records) {
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(record);
+   serializer.serializeRecord(record);
+   RecordSerializer.SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
numBytes += record.length() + serializationOverhead;
 
if (numBytes < segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
} else if (numBytes == segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL,
 result);
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
numBytes = 0;
} else {

Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
 
while (result.isFullBuffer()) {
numBytes -= segmentSize;
-   result = 
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
+   result = 
serializer.copyToBufferBuilder(bufferBuilder);
 
 Review comment:
   I know, this wasn't checked before, but should we actually also check for a 
full record after this `while` loop?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593684#comment-16593684
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212952611
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   I was a bit skeptical about the removal of the return value of 
`tryFinishCurrentBufferBuilder()` at first, but I don't see a reason to tie 
breaking out of the loop from full records to actually having a buffer builder 
present - once we completed writing the complete record, we can break out.
   -> therefore it is ok and probably better than before


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use 

[jira] [Commented] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL

2018-08-27 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593677#comment-16593677
 ] 

Timo Walther commented on FLINK-10216:
--

The question is whether \{{SIMILAR TO}} and \{{REGEXP_MATCH}} would have 
exactly the same semantics. Simply forwarding a regex call to Java's 
implementation does not ensure any compatibility to implementations commonly 
used by database vendors.

> Add REGEXP_MATCH in TableAPI and SQL
> 
>
> Key: FLINK-10216
> URL: https://issues.apache.org/jira/browse/FLINK-10216
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Juho Autio
>Priority: Major
>
> Here's a naive implementation:
> {code:java}
> public class RegexpMatchFunction extends ScalarFunction {
> // NOTE! Flink calls eval() by reflection
> public boolean eval(String value, String pattern) {
> return value != null && pattern != null && value.matches(pattern);
> }
> }
> {code}
> I wonder if there would be a way to optimize this to use 
> {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls 
> (possibly different values, but same pattern).
> h3. Naming
> Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: 
> [https://github.com/apache/flink/pull/6448#issuecomment-415972833]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212970648
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -106,17 +102,19 @@ public boolean equals(Object obj) {
}
};
 
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
+   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.copyToBufferBuilder(bufferBuilder1));
 
-   result = serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212972706
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -169,25 +167,26 @@ private void test(Util.MockRecords records, int 
segmentSize) throws Exception {
 
// 
-
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
-
+   BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
int numBytes = 0;
for (SerializationTestType record : records) {
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(record);
+   serializer.serializeRecord(record);
+   RecordSerializer.SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
numBytes += record.length() + serializationOverhead;
 
if (numBytes < segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
} else if (numBytes == segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL,
 result);
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
numBytes = 0;
} else {

Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
 
while (result.isFullBuffer()) {
numBytes -= segmentSize;
-   result = 
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
+   result = 
serializer.copyToBufferBuilder(bufferBuilder);
 
 Review comment:
   I know, this wasn't checked before, but should we actually also check for a 
full record after this `while` loop?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212970682
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -106,17 +102,19 @@ public boolean equals(Object obj) {
}
};
 
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
+   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.copyToBufferBuilder(bufferBuilder1));
 
-   result = serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
+   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.copyToBufferBuilder(bufferBuilder1));
 
-   result = serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
+   serializer.serializeRecord(emptyRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212952611
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   I was a bit skeptical about the removal of the return value of 
`tryFinishCurrentBufferBuilder()` at first, but I don't see a reason to tie 
breaking out of the loop from full records to actually having a buffer builder 
present - once we completed writing the complete record, we can break out.
   -> therefore it is ok and probably better than before


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212974745
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   The `RecordWriter` instance should be different depending on 
`isBroadcastEmit` to really separate these two cases?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212969980
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -40,25 +41,25 @@
 
@Test
public void testHasSerializedData() throws IOException {
-   final int segmentSize = 16;
-
final SpanningRecordSerializer 
serializer = new SpanningRecordSerializer<>();
final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
Assert.assertTrue(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
+   serializer.copyToBufferBuilder(bufferBuilder1);
Assert.assertFalse(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8));
-
-   serializer.addRecord(randomIntRecord);
+   final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
+   serializer.serializeRecord(randomIntRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212980692
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException {
}
}
 
+   /**
+* Broadcast channel selector that selects all the output channels.
+*/
+   private static class Broadcast implements 
ChannelSelector {
+
+   private int[] returnChannel;
+   boolean set;
 
 Review comment:
   Actually, this is a copy of 
`org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner` which is 
in the `flink-streaming-java` submodule, though.
   And in general it is good to cache this rather than building a new array for 
every record...
   
   Using `returnChannel.length == numberOfOutputChannels` makes sense though - 
@zhijiangW can you also create a hotfix commit changing this in 
`org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212969997
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -40,25 +41,25 @@
 
@Test
public void testHasSerializedData() throws IOException {
-   final int segmentSize = 16;
-
final SpanningRecordSerializer 
serializer = new SpanningRecordSerializer<>();
final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
Assert.assertTrue(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
+   serializer.copyToBufferBuilder(bufferBuilder1);
Assert.assertFalse(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8));
-
-   serializer.addRecord(randomIntRecord);
+   final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
+   serializer.serializeRecord(randomIntRecord);
+   serializer.copyToBufferBuilder(bufferBuilder2);
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-27 Thread GitBox
NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212951771
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   actually, here, we do not only know that the buffer builder is present, we 
also already have its reference (in contrast to 
`tryFinishCurrentBufferBuilder()`) and don't need to update the 
`bufferBuilders` field until after the `while` loop - I'm not sure whether this 
is worth optimising, though (@pnowojski?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593661#comment-16593661
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212977695
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   OK, I don't mind listing views are tables in `SHOW TABLES`.
   
   However, if there is no way to distinguish a view from a table, we can also 
remove the `SHOW VIEW xxx` command for now. Why should somebody try to look up 
the definition of a view if he's not aware that it is a view. He can only know 
that if he created the view himself and in that case probably know the 
definition.
   
   I also think the syntax of `SHOW VIEW xxx` is not well chosen because it is 
inconsistent with `SHOW TABLES` and `SHOW FUNCTIONS`. It gives *details about a 
single view* where as the other `SHOW` commands *list all elements of a type*.  
I think the semantics are more similar to the `DESCRIBE` command. We could 
rename `SHOW VIEW` to ``DESCRIBE VIEW`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593658#comment-16593658
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212977695
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   OK, I don't mind listing views are tables in `SHOW TABLES`.
   
   However, if there is no way to distinguish a view from a table, we can also 
remove the `SHOW VIEW xxx` command. Why should somebody try to look up the 
definition of a view if he's not aware that it is a view. He can only know that 
if he created the view himself and in that case probably know the definition.
   
   I also think the syntax of `SHOW VIEW xxx` is not well chosen because it is 
inconsistent with `SHOW TABLES` and `SHOW FUNCTIONS`. It gives *details about a 
single view* where as the other `SHOW` commands *list all elements of a type*.  
I think the semantics are more similar to the `DESCRIBE` command. We could 
rename `SHOW VIEW` to ``DESCRIBE VIEW`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212977695
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   OK, I don't mind listing views are tables in `SHOW TABLES`.
   
   However, if there is no way to distinguish a view from a table, we can also 
remove the `SHOW VIEW xxx` command for now. Why should somebody try to look up 
the definition of a view if he's not aware that it is a view. He can only know 
that if he created the view himself and in that case probably know the 
definition.
   
   I also think the syntax of `SHOW VIEW xxx` is not well chosen because it is 
inconsistent with `SHOW TABLES` and `SHOW FUNCTIONS`. It gives *details about a 
single view* where as the other `SHOW` commands *list all elements of a type*.  
I think the semantics are more similar to the `DESCRIBE` command. We could 
rename `SHOW VIEW` to ``DESCRIBE VIEW`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-27 Thread GitBox
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212977695
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   OK, I don't mind listing views are tables in `SHOW TABLES`.
   
   However, if there is no way to distinguish a view from a table, we can also 
remove the `SHOW VIEW xxx` command. Why should somebody try to look up the 
definition of a view if he's not aware that it is a view. He can only know that 
if he created the view himself and in that case probably know the definition.
   
   I also think the syntax of `SHOW VIEW xxx` is not well chosen because it is 
inconsistent with `SHOW TABLES` and `SHOW FUNCTIONS`. It gives *details about a 
single view* where as the other `SHOW` commands *list all elements of a type*.  
I think the semantics are more similar to the `DESCRIBE` command. We could 
rename `SHOW VIEW` to ``DESCRIBE VIEW`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maqingxiang opened a new pull request #6620: [hotfix][flink-streaming-java] modify CreateStreamOutput() in OperatorChain.java

2018-08-27 Thread GitBox
maqingxiang opened a new pull request #6620: [hotfix][flink-streaming-java] 
modify CreateStreamOutput() in OperatorChain.java
URL: https://github.com/apache/flink/pull/6620
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10227) Remove usage of javax.xml.bind.DatatypeConverter

2018-08-27 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10227:


 Summary: Remove usage of javax.xml.bind.DatatypeConverter
 Key: FLINK-10227
 URL: https://issues.apache.org/jira/browse/FLINK-10227
 Project: Flink
  Issue Type: Sub-task
  Components: Core
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.0


In java 9 {{javax.xml.bind.DatatypeConverter}} is no longer accessible by 
default. Since this calss is only used in 3 instances (and only the single 
method {{parseHexBinary}}) we should replace it with another implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] maqingxiang opened a new pull request #6619: [hotfix][flink-streaming-java] modify typo in the OperatorChain.java

2018-08-27 Thread GitBox
maqingxiang opened a new pull request #6619: [hotfix][flink-streaming-java] 
modify typo in the OperatorChain.java
URL: https://github.com/apache/flink/pull/6619
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-5060) only serialise records once in RecordWriter#emit and RecordWriter#broadcastEmit

2018-08-27 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber closed FLINK-5060.
--
Resolution: Duplicate
  Assignee: (was: Nico Kruber)

> only serialise records once in RecordWriter#emit and 
> RecordWriter#broadcastEmit
> ---
>
> Key: FLINK-5060
> URL: https://issues.apache.org/jira/browse/FLINK-5060
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Priority: Major
>
> Currently, org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit 
> and org.apache.flink.runtime.io.network.api.writer.RecordWriter#broadcastEmit 
> serialise a record once per target channel. Instead, they could serialise the 
> record only once and use the serialised form for every channel and thus save 
> resources.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10207) Bump checkstyle-plugin to 8.9

2018-08-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10207:
---
Labels: pull-request-available  (was: )

> Bump checkstyle-plugin to 8.9
> -
>
> Key: FLINK-10207
> URL: https://issues.apache.org/jira/browse/FLINK-10207
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Our current checkstyle version (8.4) is incompatible with java 9, the 
> earliest version to work properly is 8.9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10207) Bump checkstyle-plugin to 8.9

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593604#comment-16593604
 ] 

ASF GitHub Bot commented on FLINK-10207:


zentol opened a new pull request #6618: [FLINK-10207][build] Bump checkstyle to 
8.9
URL: https://github.com/apache/flink/pull/6618
 
 
   ## What is the purpose of the change
   
   This PR bumps the checkstyle version to 8.9, which is a prerequisite for 
compiling with java 9.
   
   ## Brief change log
   
   * bump version to 8.9
   * updated IDE setup docs
   * fixed a number of checkstyle violations that weren't detected previously
   
   ## Verifying this change
   
   I had to setup a local VM with jdk 9 to encounter the issue and verify the 
fix. Travis is unfortunately unreliable for this, as some jdk 9 
incompatibilities that I encountered locally didn't occur on travis (which is 
actually **very** problematic).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bump checkstyle-plugin to 8.9
> -
>
> Key: FLINK-10207
> URL: https://issues.apache.org/jira/browse/FLINK-10207
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Our current checkstyle version (8.4) is incompatible with java 9, the 
> earliest version to work properly is 8.9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol opened a new pull request #6618: [FLINK-10207][build] Bump checkstyle to 8.9

2018-08-27 Thread GitBox
zentol opened a new pull request #6618: [FLINK-10207][build] Bump checkstyle to 
8.9
URL: https://github.com/apache/flink/pull/6618
 
 
   ## What is the purpose of the change
   
   This PR bumps the checkstyle version to 8.9, which is a prerequisite for 
compiling with java 9.
   
   ## Brief change log
   
   * bump version to 8.9
   * updated IDE setup docs
   * fixed a number of checkstyle violations that weren't detected previously
   
   ## Verifying this change
   
   I had to setup a local VM with jdk 9 to encounter the issue and verify the 
fix. Travis is unfortunately unreliable for this, as some jdk 9 
incompatibilities that I encountered locally didn't occur on travis (which is 
actually **very** problematic).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-4893) Use single RecordSerializer per RecordWriter

2018-08-27 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber closed FLINK-4893.
--
Resolution: Duplicate

> Use single RecordSerializer per RecordWriter
> 
>
> Key: FLINK-4893
> URL: https://issues.apache.org/jira/browse/FLINK-4893
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Ufuk Celebi
>Priority: Major
>
> Instead of creating n instances of the serializer for each outgoing channel, 
> we can simply keep the references to the {{Buffer}} instances in the writer 
> and use a single record serializer instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >