[jira] [Assigned] (FLINK-6344) Migrate from Java serialization for BucketingSink's state

2018-08-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-6344:
---

Assignee: vinoyang  (was: Hai Zhou)

> Migrate from Java serialization for BucketingSink's state
> -
>
> Key: FLINK-6344
> URL: https://issues.apache.org/jira/browse/FLINK-6344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: vinoyang
>Priority: Major
>
> See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration 
> for `BucketingSink`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-6347) Migrate from Java serialization for MessageAcknowledgingSourceBase's state

2018-08-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-6347:
---

Assignee: vinoyang  (was: Hai Zhou)

> Migrate from Java serialization for MessageAcknowledgingSourceBase's state
> --
>
> Key: FLINK-6347
> URL: https://issues.apache.org/jira/browse/FLINK-6347
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: vinoyang
>Priority: Major
>
> See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration 
> for {{MessageAcknowledgingSourceBase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10214) why is job applying as many TMs as default parallelism when starting, each parallelism is 1

2018-08-24 Thread lzh9 (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592461#comment-16592461
 ] 

lzh9 commented on FLINK-10214:
--

[~yanghua] , the parallelism is 6*8=48. If it's set 1, the job will apply only 
one TM and use one slot. 

> why is job applying as many TMs as default parallelism when starting, each 
> parallelism is 1
> ---
>
> Key: FLINK-10214
> URL: https://issues.apache.org/jira/browse/FLINK-10214
> Project: Flink
>  Issue Type: Task
>  Components: Cluster Management, TaskManager, YARN
>Affects Versions: 1.5.0
>Reporter: lzh9
>Priority: Major
>
> If I set TM number=6, slot num each TM=8, memory each TM=4G, the job will 
> apply 48 TMs, and use only one slot each, total memory usage is 192G. A few 
> minutes later, it will release 40TMs, everything will be normal. 
> Is there any reason about it? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7624) Add kafka-topic for "KafkaProducer" metrics

2018-08-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-7624:
---

Assignee: vinoyang

> Add kafka-topic for "KafkaProducer" metrics
> ---
>
> Key: FLINK-7624
> URL: https://issues.apache.org/jira/browse/FLINK-7624
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou
>Assignee: vinoyang
>Priority: Major
>
> Currently, metric in "KafkaProducer" MetricGroup, Such as:
> {code:java}
> localhost.taskmanager.dc4092a96ea4e54ecdbd13b9a5c209b2.Flink Streaming 
> Job.Sink--MTKafkaProducer08.0.KafkaProducer.record-queue-time-avg
> {code}
> The metric name in the "KafkaProducer" group does not have a kafka-topic name 
> part,  if the job writes data to two different kafka sinks, these metrics 
> will not distinguish.
> I wish that modify the above metric name as follows:
> {code:java}
> localhost.taskmanager.dc4092a96ea4e54ecdbd13b9a5c209b2.Flink Streaming 
> Job.Sink--MTKafkaProducer08.0.KafkaProducer. topic>.record-queue-time-avg
> {code}
> Best,
> Hai Zhou



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10214) why is job applying as many TMs as default parallelism when starting, each parallelism is 1

2018-08-24 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592449#comment-16592449
 ] 

vinoyang commented on FLINK-10214:
--

[~lzh9] What is the parallelism of your job? Since FLIP-6, Flink has adopted an 
elastic resource application method.

> why is job applying as many TMs as default parallelism when starting, each 
> parallelism is 1
> ---
>
> Key: FLINK-10214
> URL: https://issues.apache.org/jira/browse/FLINK-10214
> Project: Flink
>  Issue Type: Task
>  Components: Cluster Management, TaskManager, YARN
>Affects Versions: 1.5.0
>Reporter: lzh9
>Priority: Major
>
> If I set TM number=6, slot num each TM=8, memory each TM=4G, the job will 
> apply 48 TMs, and use only one slot each, total memory usage is 192G. A few 
> minutes later, it will release 40TMs, everything will be normal. 
> Is there any reason about it? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10212) REST API for listing all the available save points

2018-08-24 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592436#comment-16592436
 ] 

vinoyang commented on FLINK-10212:
--

Hi [~mrooding] good thoughts, I think GET is better for the semantic of Restful 
, Are you going to start this issue? If not, I'd like to do it.

> REST API for listing all the available save points
> --
>
> Key: FLINK-10212
> URL: https://issues.apache.org/jira/browse/FLINK-10212
> Project: Flink
>  Issue Type: New Feature
>Reporter: Marc Rooding
>Priority: Major
>
> *Background*
> I'm one of the authors of the open-source Flink job deployer 
> ([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our 
> implementation to use the Flink REST API instead of the native CLI. 
> In our use case, we store the job savepoints in a Kubernetes persistent 
> volume. For our deployer, we mount the persistent volume to our deployer 
> container so that we can find and use the savepoints. 
> In the rewrite to the REST API, I saw that the API to monitor savepoint 
> creation returns the complete path to the created savepoint, and we can use 
> this one in the job deployer to start the new job with the latest save point.
> However, we also allow users to deploy a job with a recovered state by 
> specifying only the directory savepoints are stored in. In this scenario we 
> will look for the latest savepoint created for this job ourselves inside the 
> given directory. To find this path, we're still relying on the mounted volume 
> and listing directory content to discover savepoints.
> *Feature*
> I was thinking that it might be a good addition if the native Flink REST API 
> offers the ability to retrieve savepoints. Seeing that the API doesn't 
> inherently know where savepoints are stored, it could take a path as one of 
> the arguments. It could even allow the user to provide a job ID as an 
> argument so that the API would be able to search for savepoints for a 
> specific job ID in the specified directory. 
> As the API would require the path as an argument, and providing a path 
> containing forward slashes in the URL isn't ideal, I'm eager to discuss what 
> a proper solution would look like.
> A POST request to /jobs/:jobid/savepoints with the path as a body parameter 
> would make sense if the API were to offer to list all save points in a 
> specific path but this request is already being used for creating new 
> savepoints.
> An alternative could be a POST to /savepoints with the path and job ID in the 
> request body.
> A POST request to retrieve data is obviously not the most straightforward 
> approach but in my opinion still preferable over a GET to, for example, 
> /jobs/:jobid/savepoints/:targetDirectory
> I'm willing to help out on this one by submitting a pull request.
> Looking forward to your thoughts! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592433#comment-16592433
 ] 

ASF GitHub Bot commented on FLINK-10136:


yanghua commented on a change in pull request #6597: [FLINK-10136] [table] Add 
REPEAT supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6597#discussion_r212788014
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -2614,6 +2626,18 @@ STRING.rtrim()
 E.g., 'This is a test String. '.rtrim() returns "This 
is a test String.".
   
 
+
+
+  
+{% highlight java %}
+STRING.repeat(INT)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base STRING INT 
times. 
+E.g., "This is a test String. ".repeat(2) returns 
"This is a test String.This is a test String.".
 
 Review comment:
   Thank you very much. @xccui 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add REPEAT supported in Table API and SQL
> -
>
> Key: FLINK-10136
> URL: https://issues.apache.org/jira/browse/FLINK-10136
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Oracle : 
> [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat]
> MySql: 
> https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL

2018-08-24 Thread GitBox
yanghua commented on a change in pull request #6597: [FLINK-10136] [table] Add 
REPEAT supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6597#discussion_r212788014
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -2614,6 +2626,18 @@ STRING.rtrim()
 E.g., 'This is a test String. '.rtrim() returns "This 
is a test String.".
   
 
+
+
+  
+{% highlight java %}
+STRING.repeat(INT)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base STRING INT 
times. 
+E.g., "This is a test String. ".repeat(2) returns 
"This is a test String.This is a test String.".
 
 Review comment:
   Thank you very much. @xccui 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592422#comment-16592422
 ] 

ASF GitHub Bot commented on FLINK-9610:
---

cricket007 commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] 
Add Kafka Partitioner that uses the hash of the provided key.
URL: https://github.com/apache/flink/pull/6181#issuecomment-415930190
 
 
   Kafka uses a Murmur2 Hash, not `Arrays.hashCode`
   
   
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L69


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Kafka partitioner that uses the key to partition by
> ---
>
> Key: FLINK-9610
> URL: https://issues.apache.org/jira/browse/FLINK-9610
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Major
>  Labels: pull-request-available
>
> The kafka connector package only contains the FlinkFixedPartitioner 
> implementation of the FlinkKafkaPartitioner.
> The most common usecase I have seen is the need to spread the records across 
> the Kafka partitions while keeping all messages with the same key together.
> I'll put up a pull request with a very simple implementation that should make 
> this a lot easier for others to use and extend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] cricket007 commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.

2018-08-24 Thread GitBox
cricket007 commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] 
Add Kafka Partitioner that uses the hash of the provided key.
URL: https://github.com/apache/flink/pull/6181#issuecomment-415930190
 
 
   Kafka uses a Murmur2 Hash, not `Arrays.hashCode`
   
   
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L69


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10214) why is job applying as many TMs as default parallelism when starting, each parallelism is 1

2018-08-24 Thread lzh9 (JIRA)
lzh9 created FLINK-10214:


 Summary: why is job applying as many TMs as default parallelism 
when starting, each parallelism is 1
 Key: FLINK-10214
 URL: https://issues.apache.org/jira/browse/FLINK-10214
 Project: Flink
  Issue Type: Task
  Components: Cluster Management, TaskManager, YARN
Affects Versions: 1.5.0
Reporter: lzh9


If I set TM number=6, slot num each TM=8, memory each TM=4G, the job will apply 
48 TMs, and use only one slot each, total memory usage is 192G. A few minutes 
later, it will release 40TMs, everything will be normal. 

Is there any reason about it? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10136) Add REPEAT supported in Table API and SQL

2018-08-24 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui resolved FLINK-10136.
-
Resolution: Done

Implemented in 1.7.0 505dca174128ebb3bf765778ee36d58f680d6a1e

> Add REPEAT supported in Table API and SQL
> -
>
> Key: FLINK-10136
> URL: https://issues.apache.org/jira/browse/FLINK-10136
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Oracle : 
> [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat]
> MySql: 
> https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests

2018-08-24 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-10201:

Fix Version/s: 1.7.0

> The batchTestUtil was mistakenly used in some stream sql tests
> --
>
> Key: FLINK-10201
> URL: https://issues.apache.org/jira/browse/FLINK-10201
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The {{batchTestUtil}} was mistakenly used in stream sql tests 
> {{SetOperatorsTest.testValuesWithCast()}} and 
> {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10136) Add REPEAT supported in Table API and SQL

2018-08-24 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-10136:

Fix Version/s: 1.7.0

> Add REPEAT supported in Table API and SQL
> -
>
> Key: FLINK-10136
> URL: https://issues.apache.org/jira/browse/FLINK-10136
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Oracle : 
> [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat]
> MySql: 
> https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592376#comment-16592376
 ] 

ASF GitHub Bot commented on FLINK-10136:


asfgit closed pull request #6597: [FLINK-10136] [table] Add REPEAT supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6597
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 24d8d70080b..759cf2f8db0 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2401,6 +2401,18 @@ RTRIM(string)
   
 
 
+
+  
+{% highlight text %}
+REPEAT(string, integer)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base string integer 
times. 
+E.g., REPEAT('This is a test String.', 2) returns 
"This is a test String.This is a test String.".
+  
+
+
 
   
 {% highlight text %}
@@ -2614,6 +2626,18 @@ STRING.rtrim()
 E.g., 'This is a test String. '.rtrim() returns "This 
is a test String.".
   
 
+
+
+  
+{% highlight java %}
+STRING.repeat(INT)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base STRING INT 
times. 
+E.g., "This is a test String. ".repeat(2) returns 
"This is a test String.This is a test String.".
+  
+
 
 
   
@@ -2830,6 +2854,18 @@ STRING.rtrim()
   
 
 
+
+  
+{% highlight scala %}
+STRING.repeat(INT)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base STRING INT 
times. 
+E.g., "This is a test String. ".repeat(2) returns 
"This is a test String.This is a test String.".
+  
+ 
+
 
   
 {% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 8b08af68117..a0cfac65923 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -573,6 +573,11 @@ trait ImplicitExpressionOperations {
 */
   def rtrim() = RTrim(expr)
 
+  /**
+* Returns a string that repeats the base string n times.
+*/
+  def repeat(n: Expression) = Repeat(expr, n)
+
   // Temporal operations
 
   /**
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 7eb91d3806d..899cb0ff35a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -141,4 +141,10 @@ object BuiltInMethods {
   val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", 
classOf[String])
 
   val UUID: Method = Types.lookupMethod(classOf[ScalarFunctions], "uuid")
+
+  val REPEAT: Method = Types.lookupMethod(
+classOf[ScalarFunctions],
+"repeat",
+classOf[String],
+classOf[Int])
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 7c328c98be1..c7eb869a5b5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -176,6 +176,12 @@ object FunctionGenerator {
 STRING_TYPE_INFO,
 BuiltInMethod.RTRIM.method)
 
+  addSqlFunctionMethod(
+REPEAT,
+Seq(STRING_TYPE_INFO, INT_TYPE_INFO),
+STRING_TYPE_INFO,
+BuiltInMethods.REPEAT)
+
   // 
--
   // Arithmetic functions
   // 
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index b2d7a3deb18..70794407cc0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ 

[GitHub] asfgit closed pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL

2018-08-24 Thread GitBox
asfgit closed pull request #6597: [FLINK-10136] [table] Add REPEAT supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6597
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 24d8d70080b..759cf2f8db0 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2401,6 +2401,18 @@ RTRIM(string)
   
 
 
+
+  
+{% highlight text %}
+REPEAT(string, integer)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base string integer 
times. 
+E.g., REPEAT('This is a test String.', 2) returns 
"This is a test String.This is a test String.".
+  
+
+
 
   
 {% highlight text %}
@@ -2614,6 +2626,18 @@ STRING.rtrim()
 E.g., 'This is a test String. '.rtrim() returns "This 
is a test String.".
   
 
+
+
+  
+{% highlight java %}
+STRING.repeat(INT)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base STRING INT 
times. 
+E.g., "This is a test String. ".repeat(2) returns 
"This is a test String.This is a test String.".
+  
+
 
 
   
@@ -2830,6 +2854,18 @@ STRING.rtrim()
   
 
 
+
+  
+{% highlight scala %}
+STRING.repeat(INT)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base STRING INT 
times. 
+E.g., "This is a test String. ".repeat(2) returns 
"This is a test String.This is a test String.".
+  
+ 
+
 
   
 {% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 8b08af68117..a0cfac65923 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -573,6 +573,11 @@ trait ImplicitExpressionOperations {
 */
   def rtrim() = RTrim(expr)
 
+  /**
+* Returns a string that repeats the base string n times.
+*/
+  def repeat(n: Expression) = Repeat(expr, n)
+
   // Temporal operations
 
   /**
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 7eb91d3806d..899cb0ff35a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -141,4 +141,10 @@ object BuiltInMethods {
   val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", 
classOf[String])
 
   val UUID: Method = Types.lookupMethod(classOf[ScalarFunctions], "uuid")
+
+  val REPEAT: Method = Types.lookupMethod(
+classOf[ScalarFunctions],
+"repeat",
+classOf[String],
+classOf[Int])
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 7c328c98be1..c7eb869a5b5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -176,6 +176,12 @@ object FunctionGenerator {
 STRING_TYPE_INFO,
 BuiltInMethod.RTRIM.method)
 
+  addSqlFunctionMethod(
+REPEAT,
+Seq(STRING_TYPE_INFO, INT_TYPE_INFO),
+STRING_TYPE_INFO,
+BuiltInMethods.REPEAT)
+
   // 
--
   // Arithmetic functions
   // 
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index b2d7a3deb18..70794407cc0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -459,3 +459,31 @@ case class RTrim(child: Expression) extends 
UnaryExpression with InputTypeSpec {
 
   override def toString = s"($child).rtrim"
 }
+
+/**
+  * Returns a 

[jira] [Commented] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592372#comment-16592372
 ] 

ASF GitHub Bot commented on FLINK-10201:


xccui closed pull request #6605: [FLINK-10201] [table] [test] The batchTestUtil 
was mistakenly used in some stream sql tests
URL: https://github.com/apache/flink/pull/6605
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
index efb83b456ed..f179ae6cfac 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
@@ -106,7 +106,7 @@ class CorrelateTest extends TableTestBase {
 
   @Test
   def testLeftOuterJoinAsSubQuery(): Unit = {
-val util = batchTestUtil()
+val util = streamTestUtil()
 val func1 = new TableFunc1
 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2)
@@ -121,13 +121,13 @@ class CorrelateTest extends TableTestBase {
 | ON c2 = s """.stripMargin
 
 val expected = binaryNode(
-  "DataSetJoin",
-  batchTableNode(1),
+  "DataStreamJoin",
+  streamTableNode(1),
   unaryNode(
-"DataSetCalc",
+"DataStreamCalc",
 unaryNode(
-  "DataSetCorrelate",
-  batchTableNode(0),
+  "DataStreamCorrelate",
+  streamTableNode(0),
   term("invocation", "func1($cor0.c)"),
   term("correlate", "table(func1($cor0.c))"),
   term("select", "a", "b", "c", "f0"),
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
index 97dbe0dad66..fb4b4b5f409 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
@@ -173,29 +173,26 @@ class SetOperatorsTest extends TableTestBase {
 
   @Test
   def testValuesWithCast(): Unit = {
-val util = batchTestUtil()
+val util = streamTestUtil()
 
 val expected = naryNode(
-  "DataSetUnion",
+  "DataStreamUnion",
   List(
-unaryNode("DataSetCalc",
-  values("DataSetValues",
-tuples(List("0")),
-"values=[ZERO]"),
+unaryNode("DataStreamCalc",
+  values("DataStreamValues",
+tuples(List("0"))),
   term("select", "1 AS EXPR$0, 1 AS EXPR$1")),
-unaryNode("DataSetCalc",
-  values("DataSetValues",
-tuples(List("0")),
-"values=[ZERO]"),
+unaryNode("DataStreamCalc",
+  values("DataStreamValues",
+tuples(List("0"))),
   term("select", "2 AS EXPR$0, 2 AS EXPR$1")),
-unaryNode("DataSetCalc",
-  values("DataSetValues",
-tuples(List("0")),
-"values=[ZERO]"),
+unaryNode("DataStreamCalc",
+  values("DataStreamValues",
+tuples(List("0"))),
   term("select", "3 AS EXPR$0, 3 AS EXPR$1"))
   ),
   term("all", "true"),
-  term("union", "EXPR$0, EXPR$1")
+  term("union all", "EXPR$0, EXPR$1")
 )
 
 util.verifySql(


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The batchTestUtil was mistakenly used in some stream sql tests
> --
>
> Key: FLINK-10201
> URL: https://issues.apache.org/jira/browse/FLINK-10201
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
>
> The {{batchTestUtil}} was mistakenly used in stream sql tests 
> {{SetOperatorsTest.testValuesWithCast()}} and 
> {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests

2018-08-24 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui resolved FLINK-10201.
-
Resolution: Fixed

Fixed in 1.7.0 6d28a65092ffe4a4390fccacb4deb7e403924f51

> The batchTestUtil was mistakenly used in some stream sql tests
> --
>
> Key: FLINK-10201
> URL: https://issues.apache.org/jira/browse/FLINK-10201
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
>
> The {{batchTestUtil}} was mistakenly used in stream sql tests 
> {{SetOperatorsTest.testValuesWithCast()}} and 
> {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui closed pull request #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests

2018-08-24 Thread GitBox
xccui closed pull request #6605: [FLINK-10201] [table] [test] The batchTestUtil 
was mistakenly used in some stream sql tests
URL: https://github.com/apache/flink/pull/6605
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
index efb83b456ed..f179ae6cfac 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
@@ -106,7 +106,7 @@ class CorrelateTest extends TableTestBase {
 
   @Test
   def testLeftOuterJoinAsSubQuery(): Unit = {
-val util = batchTestUtil()
+val util = streamTestUtil()
 val func1 = new TableFunc1
 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2)
@@ -121,13 +121,13 @@ class CorrelateTest extends TableTestBase {
 | ON c2 = s """.stripMargin
 
 val expected = binaryNode(
-  "DataSetJoin",
-  batchTableNode(1),
+  "DataStreamJoin",
+  streamTableNode(1),
   unaryNode(
-"DataSetCalc",
+"DataStreamCalc",
 unaryNode(
-  "DataSetCorrelate",
-  batchTableNode(0),
+  "DataStreamCorrelate",
+  streamTableNode(0),
   term("invocation", "func1($cor0.c)"),
   term("correlate", "table(func1($cor0.c))"),
   term("select", "a", "b", "c", "f0"),
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
index 97dbe0dad66..fb4b4b5f409 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
@@ -173,29 +173,26 @@ class SetOperatorsTest extends TableTestBase {
 
   @Test
   def testValuesWithCast(): Unit = {
-val util = batchTestUtil()
+val util = streamTestUtil()
 
 val expected = naryNode(
-  "DataSetUnion",
+  "DataStreamUnion",
   List(
-unaryNode("DataSetCalc",
-  values("DataSetValues",
-tuples(List("0")),
-"values=[ZERO]"),
+unaryNode("DataStreamCalc",
+  values("DataStreamValues",
+tuples(List("0"))),
   term("select", "1 AS EXPR$0, 1 AS EXPR$1")),
-unaryNode("DataSetCalc",
-  values("DataSetValues",
-tuples(List("0")),
-"values=[ZERO]"),
+unaryNode("DataStreamCalc",
+  values("DataStreamValues",
+tuples(List("0"))),
   term("select", "2 AS EXPR$0, 2 AS EXPR$1")),
-unaryNode("DataSetCalc",
-  values("DataSetValues",
-tuples(List("0")),
-"values=[ZERO]"),
+unaryNode("DataStreamCalc",
+  values("DataStreamValues",
+tuples(List("0"))),
   term("select", "3 AS EXPR$0, 3 AS EXPR$1"))
   ),
   term("all", "true"),
-  term("union", "EXPR$0, EXPR$1")
+  term("union all", "EXPR$0, EXPR$1")
 )
 
 util.verifySql(


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs

2018-08-24 Thread GitBox
fhueske commented on issue #6508: [Flink-10079] [table] Automatically register 
sink table from external catalogs 
URL: https://github.com/apache/flink/pull/6508#issuecomment-415906486
 
 
   Hi @tragicjun, sorry for the late reply.
   I'll have a look at this PR next week.
   
   Best, Fabian


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9086) Use INTERNAL_HADOOP_CLASSPATHS as classpath for launching Yarn cluster for Scala Shell

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592298#comment-16592298
 ] 

ASF GitHub Bot commented on FLINK-9086:
---

ruankd commented on issue #5768: [FLINK-9086] Use INTERNAL_HADOOP_CLASSPATHS as 
classpath for Yarn Session
URL: https://github.com/apache/flink/pull/5768#issuecomment-415904593
 
 
   Ping.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use INTERNAL_HADOOP_CLASSPATHS as classpath for launching Yarn cluster for 
> Scala Shell
> --
>
> Key: FLINK-9086
> URL: https://issues.apache.org/jira/browse/FLINK-9086
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.4.0
>Reporter: Keda Ruan
>Priority: Major
>  Labels: pull-request-available
>
> Some environments don't have hadoop jars in {{HADOOP_CLASSPATH}}, causing 
> Flink Yarn cluster unable to start due to some jars missing. For example 
> Flink 1.4.0 drops jersey dependency in its shaded hadoop jar, causing Yarn 
> cluster unable to start:
> {quote}Exception in thread "main" java.lang.NoClassDefFoundError: 
> com/sun/jersey/core/util/FeaturesAndProperties
>  at java.lang.ClassLoader.defineClass1(Native Method)
>  at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> {quote}
> Since there is a {{INTERNAL_HADOOP_CLASSPATHS}} in {{config.sh}} that has 
> {{hadoop classpath}} result, we can utilize this and improve the user 
> experience on starting Flink Yarn cluster for Scala Shell.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9086) Use INTERNAL_HADOOP_CLASSPATHS as classpath for launching Yarn cluster for Scala Shell

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9086:
--
Labels: pull-request-available  (was: )

> Use INTERNAL_HADOOP_CLASSPATHS as classpath for launching Yarn cluster for 
> Scala Shell
> --
>
> Key: FLINK-9086
> URL: https://issues.apache.org/jira/browse/FLINK-9086
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.4.0
>Reporter: Keda Ruan
>Priority: Major
>  Labels: pull-request-available
>
> Some environments don't have hadoop jars in {{HADOOP_CLASSPATH}}, causing 
> Flink Yarn cluster unable to start due to some jars missing. For example 
> Flink 1.4.0 drops jersey dependency in its shaded hadoop jar, causing Yarn 
> cluster unable to start:
> {quote}Exception in thread "main" java.lang.NoClassDefFoundError: 
> com/sun/jersey/core/util/FeaturesAndProperties
>  at java.lang.ClassLoader.defineClass1(Native Method)
>  at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> {quote}
> Since there is a {{INTERNAL_HADOOP_CLASSPATHS}} in {{config.sh}} that has 
> {{hadoop classpath}} result, we can utilize this and improve the user 
> experience on starting Flink Yarn cluster for Scala Shell.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ruankd commented on issue #5768: [FLINK-9086] Use INTERNAL_HADOOP_CLASSPATHS as classpath for Yarn Session

2018-08-24 Thread GitBox
ruankd commented on issue #5768: [FLINK-9086] Use INTERNAL_HADOOP_CLASSPATHS as 
classpath for Yarn Session
URL: https://github.com/apache/flink/pull/5768#issuecomment-415904593
 
 
   Ping.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592297#comment-16592297
 ] 

ASF GitHub Bot commented on FLINK-7477:
---

ruankd commented on issue #4566: [FLINK-7477] [FLINK-7480] Various improvements 
to Flink scripts
URL: https://github.com/apache/flink/pull/4566#issuecomment-415904348
 
 
   Hey, notice that this 
[commit](https://github.com/apache/flink/commit/0a0f6ed6c3d6cff702e4322293340274bea5e7d9)
 is part of this PR but it not merged into branch 1.5 and 1.6, neither in 
master. I wonder whether it will be merged?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ruankd commented on issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink scripts

2018-08-24 Thread GitBox
ruankd commented on issue #4566: [FLINK-7477] [FLINK-7480] Various improvements 
to Flink scripts
URL: https://github.com/apache/flink/pull/4566#issuecomment-415904348
 
 
   Hey, notice that this 
[commit](https://github.com/apache/flink/commit/0a0f6ed6c3d6cff702e4322293340274bea5e7d9)
 is part of this PR but it not merged into branch 1.5 and 1.6, neither in 
master. I wonder whether it will be merged?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10213) Task managers cache a negative DNS lookup of the blob server indefinitely

2018-08-24 Thread Joey Echeverria (JIRA)
Joey Echeverria created FLINK-10213:
---

 Summary: Task managers cache a negative DNS lookup of the blob 
server indefinitely
 Key: FLINK-10213
 URL: https://issues.apache.org/jira/browse/FLINK-10213
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 1.5.0
Reporter: Joey Echeverria


When the task manager establishes a connection with the resource manager, it 
gets the hostname and port of the blob server and uses that to create an 
instance of an {{InetSocketAddress}}. Per the documentation of the constructor:
{quote}An attempt will be made to resolve the hostname into an InetAddress. If 
that attempt fails, the address will be flagged as _unresolved_{quote}
Flink never checks to see if the address was unresolved. Later when executing a 
task that needs to download from the blob server, it will use that same 
{{InetSocketAddress}} instance to attempt to connect a {{Socket}}. This will 
result in an exception similar to:
{noformat}
java.io.IOException: Failed to fetch BLOB 
97799b827ef073e04178a99f0f40b00e/p-6d8ec2ad31337110819c7c3641fdb18d3793a7fb-72bf00066308f4b4d2a9c5aea593b41f
 from jobmanager:6124 and store it under 
/tmp/blobStore-d135961a-03cb-4542-af6d-cf378ff83c12/incoming/temp-00018669
at 
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:191)
 ~[flink-dist_2.11-1.5.0.jar:1.5.0]
at 
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
 ~[flink-dist_2.11-1.5.0.jar:1.5.0]
at 
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)
 ~[flink-dist_2.11-1.5.0.jar:1.5.0]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
 ~[flink-dist_2.11-1.5.0.jar:1.5.0]
at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:863)
 [flink-dist_2.11-1.5.0.jar:1.5.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) 
[flink-dist_2.11-1.5.0.jar:1.5.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.io.IOException: Could not connect to BlobServer at address 
flink-jobmanager:6124
at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:124) 
~[flink-dist_2.11-1.5.0.jar:1.5.0]
at 
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:165)
 ~[flink-dist_2.11-1.5.0.jar:1.5.0]
... 6 more
Caused by: java.net.UnknownHostException: jobmanager
at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) 
~[?:1.8.0_171]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
~[?:1.8.0_171]
at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171]
at java.net.Socket.connect(Socket.java:538) ~[?:1.8.0_171]
at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:118) 
~[flink-dist_2.11-1.5.0.jar:1.5.0]
at 
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:165)
 ~[flink-dist_2.11-1.5.0.jar:1.5.0]
... 6 more
{noformat}

Since the {{InetSocketAddress}} is re-used, you'll have repeated failures of 
any tasks that are executed on that task manager and the only current 
workaround is to manually restart the task manager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2018-08-24 Thread Luka Jurukovski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592182#comment-16592182
 ] 

Luka Jurukovski commented on FLINK-10195:
-

[~StephanEwen]
For what I can tell no. Although this has been very much a crash course in 
RabbitMQ for me. Looking at forums it looks like the prefetch.count is the way 
that this is handled normally. Basically the consumer can tell RabbitMQ how 
many unacked messages to allow before stopping. Ie if the prefetch.count is set 
to 10,000 that is how many messages rabbitmq will allow before it needs 
acknowledgement, at which point it will send data until it hits this max.

I would imagine that Flink would not want to use this mechanism due to the fact 
that it doesn't actually "backpressure" with how Checkpointing is tied to 
Acking. One would have to do a throughput calculation and hope that there isn't 
any variance in that number that results in Flink waiting on the next 
checkpoint. Additionally since sync checkpointing is a feature there is no 
guarantees that checkpointing will happen at a regular interval.

Under the covers the Queueing consumer is using LinkedBlockingQueue and uses 
the "add" method to append to the queue.

I tried changing it to use ArrayBlockingQueue with a set capacity and the 
blocking "put" method, however this results in another problem with RabbitMQ. 
Basically this results in RabbitMQ sometimes terminating the connection to 
Flink when Flink doesn't dequeue from the queue fast enough (noticing this 
usually happens only when sync checkpointing is on and it there is long running 
checkpoints). According to some of the forums this is due to Rabbit having some 
sort of timeout with regards to how long it is willing to wait when writing to 
a clients buffer.

I have some ugly code that I am testing where I turn off the consumer when the 
buffer is full, and a monitoring thread that turns it back on when it is below 
a certain capacity. Don't know if this methodology will cause any other issues, 
and am testing more. I might be able to get rid of the monitoring thread but 
I'll look into that when I proved out this way of doing things.

Welcoming any additional thoughts or comments here. Sorry for the wall of text

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Priority: Major
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592021#comment-16592021
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212714129
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Yes, it is considered. I will add new constructor with Filter for 
ParquetInputFormat. But I will leave the conversion logic from Flink expression 
provided by FilterableTableSource to Parquet FilterPredicate within Parquet 
table source. Thanks for the input. I will update the PR within the weekend.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212714129
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Yes, it is considered. I will add new constructor with Filter for 
ParquetInputFormat. But I will leave the conversion logic from Flink expression 
provided by FilterableTableSource to Parquet FilterPredicate within Parquet 
table source. Thanks for the input. I will update the PR within the weekend.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591984#comment-16591984
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212706499
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
 
 Review comment:
   This function converts a parquet primitive type to a corresponding default 
Flink type. The explicitly type conversion to SqlTimeTypeInfo or other types 
probably can handled by users when there is a need. Otherwise, we need to bring 
user's conversion preference in the interface. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a 

[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212706499
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
 
 Review comment:
   This function converts a parquet primitive type to a corresponding default 
Flink type. The explicitly type conversion to SqlTimeTypeInfo or other types 
probably can handled by users when there is a need. Otherwise, we need to bring 
user's conversion preference in the interface. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591977#comment-16591977
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212704322
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
 
 Review comment:
   Agree. Once this PR is merged, I will create another PR 
https://issues.apache.org/jira/browse/FLINK-7244.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-24 Thread GitBox
HuangZhenQiu commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212704322
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
 
 Review comment:
   Agree. Once this PR is merged, I will create another PR 
https://issues.apache.org/jira/browse/FLINK-7244.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10212) REST API for listing all the available save points

2018-08-24 Thread Marc Rooding (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marc Rooding updated FLINK-10212:
-
Description: 
*Background*

I'm one of the authors of the open-source Flink job deployer 
([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our 
implementation to use the Flink REST API instead of the native CLI. 

In our use case, we store the job savepoints in a Kubernetes persistent volume. 
For our deployer, we mount the persistent volume to our deployer container so 
that we can find and use the savepoints. 

In the rewrite to the REST API, I saw that the API to monitor savepoint 
creation returns the complete path to the created savepoint, and we can use 
this one in the job deployer to start the new job with the latest save point.

However, we also allow users to deploy a job with a recovered state by 
specifying only the directory savepoints are stored in. In this scenario we 
will look for the latest savepoint created for this job ourselves inside the 
given directory. To find this path, we're still relying on the mounted volume 
and listing directory content to discover savepoints.

*Feature*

I was thinking that it might be a good addition if the native Flink REST API 
offers the ability to retrieve savepoints. Seeing that the API doesn't 
inherently know where savepoints are stored, it could take a path as one of the 
arguments. It could even allow the user to provide a job ID as an argument so 
that the API would be able to search for savepoints for a specific job ID in 
the specified directory. 

As the API would require the path as an argument, and providing a path 
containing forward slashes in the URL isn't ideal, I'm eager to discuss what a 
proper solution would look like.

A POST request to /jobs/:jobid/savepoints with the path as a body parameter 
would make sense if the API were to offer to list all save points in a specific 
path but this request is already being used for creating new savepoints.

An alternative could be a POST to /savepoints with the path and job ID in the 
request body.

A POST request to retrieve data is obviously not the most straightforward 
approach but in my opinion still preferable over a GET to, for example, 
/jobs/:jobid/savepoints/:targetDirectory

I'm willing to help out on this one by submitting a pull request.

Looking forward to your thoughts! 

  was:
*Background*

I'm one of the authors of the open-source Flink job deployer 
([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our 
implementation to use the Flink REST API instead of the native CLI. 

In our use case, we store the job savepoints in a Kubernetes persistent volume. 
For our deployer, we mount the persistent volume to our deployer container so 
that we can find and use the savepoints. 

In the rewrite to the REST API, I saw that the API to monitor savepoint 
creation returns the complete path to the created savepoint, and we can use 
this one in the job deployer to start the new job with the latest save point.

However, we also allow users to deploy a job with a recovered state by 
specifying only the directory savepoints are stored in. In this scenario we 
will look for the latest savepoint created for this job ourselves inside the 
given directory. To find this path, we're still relying on the mounted volume 
and listing directory content to discover savepoints.

*Feature*

I was thinking that it might be a good addition if the native Flink REST API 
offers the ability to retrieve savepoints. Seeing that the API doesn't 
inherently know where savepoints are stored, it could take a path as one of the 
arguments. It could even allow the user to provide a job ID as an argument so 
that the API would be able to search for savepoints for a specific job ID in 
the specified directory.

 

As the API would require the path as an argument, and providing a path 
containing forward slashes in the URL isn't ideal, I'm eager to discuss what a 
proper solution would look like.

A POST request to /jobs/:jobid/savepoints with the path as a body parameter 
would make sense if the API were to offer to list all save points in a specific 
path but this request is already being used for creating new savepoints.

An alternative could be a POST to /savepoints with the path and job ID in the 
request body.

A POST request to retrieve data is obviously not the most straightforward 
approach but in my opinion still preferable over a GET to, for example, 
/jobs/:jobid/savepoints/:targetDirectory

I'm willing to help out on this one by submitting a pull request.

Looking forward to your thoughts! 


> REST API for listing all the available save points
> --
>
> Key: FLINK-10212
> URL: https://issues.apache.org/jira/browse/FLINK-10212
> Project: Flink
>  Issue Type: New Feature
>  

[jira] [Created] (FLINK-10212) REST API for listing all the available save points

2018-08-24 Thread Marc Rooding (JIRA)
Marc Rooding created FLINK-10212:


 Summary: REST API for listing all the available save points
 Key: FLINK-10212
 URL: https://issues.apache.org/jira/browse/FLINK-10212
 Project: Flink
  Issue Type: New Feature
Reporter: Marc Rooding


*Background*

I'm one of the authors of the open-source Flink job deployer 
([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our 
implementation to use the Flink REST API instead of the native CLI. 

In our use case, we store the job savepoints in a Kubernetes persistent volume. 
For our deployer, we mount the persistent volume to our deployer container so 
that we can find and use the savepoints. 

In the rewrite to the REST API, I saw that the API to monitor savepoint 
creation returns the complete path to the created savepoint, and we can use 
this one in the job deployer to start the new job with the latest save point.

However, we also allow users to deploy a job with a recovered state by 
specifying only the directory savepoints are stored in. In this scenario we 
will look for the latest savepoint created for this job ourselves inside the 
given directory. To find this path, we're still relying on the mounted volume 
and listing directory content to discover savepoints.

*Feature*

I was thinking that it might be a good addition if the native Flink REST API 
offers the ability to retrieve savepoints. Seeing that the API doesn't 
inherently know where savepoints are stored, it could take a path as one of the 
arguments. It could even allow the user to provide a job ID as an argument so 
that the API would be able to search for savepoints for a specific job ID in 
the specified directory.

 

As the API would require the path as an argument, and providing a path 
containing forward slashes in the URL isn't ideal, I'm eager to discuss what a 
proper solution would look like.

A POST request to /jobs/:jobid/savepoints with the path as a body parameter 
would make sense if the API were to offer to list all save points in a specific 
path but this request is already being used for creating new savepoints.

An alternative could be a POST to /savepoints with the path and job ID in the 
request body.

A POST request to retrieve data is obviously not the most straightforward 
approach but in my opinion still preferable over a GET to, for example, 
/jobs/:jobid/savepoints/:targetDirectory

I'm willing to help out on this one by submitting a pull request.

Looking forward to your thoughts! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10153) Add tutorial section to documentation

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591943#comment-16591943
 ] 

ASF GitHub Bot commented on FLINK-10153:


asfgit closed pull request #6565: [FLINK-10153] [docs] Add Tutorials section 
and rework structure.
URL: https://github.com/apache/flink/pull/6565
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md
index c4215074683..bd7ca5aff90 100644
--- a/docs/dev/api_concepts.md
+++ b/docs/dev/api_concepts.md
@@ -510,7 +510,7 @@ data.map(new MapFunction () {
 
  Java 8 Lambdas
 
-Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 
8 Guide]({{ site.baseurl }}/dev/java8.html).
+Flink also supports Java 8 Lambdas in the Java API.
 
 {% highlight java %}
 data.filter(s -> s.startsWith("http://;));
diff --git a/docs/dev/batch/examples.md b/docs/dev/batch/examples.md
index 90e372dfd60..fe2bd8d3bdc 100644
--- a/docs/dev/batch/examples.md
+++ b/docs/dev/batch/examples.md
@@ -27,8 +27,7 @@ The following example programs showcase different 
applications of Flink
 from simple word counting to graph algorithms. The code samples illustrate the
 use of [Flink's DataSet API]({{ site.baseurl }}/dev/batch/index.html).
 
-The full source code of the following and more examples can be found in the 
__flink-examples-batch__
-or __flink-examples-streaming__ module of the Flink source repository.
+The full source code of the following and more examples can be found in the {% 
gh_link flink-examples/flink-examples-batch "flink-examples-batch" %} module of 
the Flink source repository.
 
 * This will be replaced by the TOC
 {:toc}
@@ -420,102 +419,4 @@ Input files are plain text files and must be formatted as 
follows:
 - Edges are represented as pairs for vertex IDs which are separated by space 
characters. Edges are separated by new-line characters:
 * For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) links 
(1)-(2), (2)-(12), (1)-(12), and (42)-(63).
 
-## Relational Query
-
-The Relational Query example assumes two tables, one with `orders` and the 
other with `lineitems` as specified by the [TPC-H decision support 
benchmark](http://www.tpc.org/tpch/). TPC-H is a standard benchmark in the 
database industry. See below for instructions how to generate the input data.
-
-The example implements the following SQL query.
-
-{% highlight sql %}
-SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
-FROM orders, lineitem
-WHERE l_orderkey = o_orderkey
-AND o_orderstatus = "F"
-AND YEAR(o_orderdate) > 1993
-AND o_orderpriority LIKE "5%"
-GROUP BY l_orderkey, o_shippriority;
-{% endhighlight %}
-
-The Flink program, which implements the above query looks as follows.
-
-
-
-
-{% highlight java %}
-// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, 
shippriority)
-DataSet> orders = 
getOrdersDataSet(env);
-// get lineitem data set: (orderkey, extendedprice)
-DataSet> lineitems = getLineitemDataSet(env);
-
-// orders filtered by year: (orderkey, custkey)
-DataSet> ordersFilteredByYear =
-// filter orders
-orders.filter(
-new FilterFunction>() {
-@Override
-public boolean filter(Tuple5 t) {
-// status filter
-if(!t.f1.equals(STATUS_FILTER)) {
-return false;
-// year filter
-} else if(Integer.parseInt(t.f2.substring(0, 4)) <= 
YEAR_FILTER) {
-return false;
-// order priority filter
-} else if(!t.f3.startsWith(OPRIO_FILTER)) {
-return false;
-}
-return true;
-}
-})
-// project fields out that are no longer required
-.project(0,4).types(Integer.class, Integer.class);
-
-// join orders with lineitems: (orderkey, shippriority, extendedprice)
-DataSet> lineitemsOfOrders =
-ordersFilteredByYear.joinWithHuge(lineitems)
-.where(0).equalTo(0)
-.projectFirst(0,1).projectSecond(1)
-.types(Integer.class, Integer.class, Double.class);
-
-// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
-DataSet> priceSums =
-// group by order and sum extendedprice
-lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
-
-// emit result
-priceSums.writeAsCsv(outputPath);
-{% endhighlight %}
-
-The {% gh_link 

[jira] [Closed] (FLINK-10153) Add tutorial section to documentation

2018-08-24 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-10153.
-
   Resolution: Done
Fix Version/s: 1.7.0

Done for 1.7.0 with 52cbe07ba7a367880475af59596adc2365bd8a21

> Add tutorial section to documentation
> -
>
> Key: FLINK-10153
> URL: https://issues.apache.org/jira/browse/FLINK-10153
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The current documentation does not feature a dedicated tutorials section and 
> has a few issues that should be fix in order to help our (future) users 
> getting started with Flink.
> I propose to add a single "Tutorials" section to the documentation where 
> users find step-by-step guides. The tutorials section help users with 
> different goals:
>   * Get a quick idea of the overall system
>   * Implement a DataStream/DataSet/Table API/SQL job
>   * Set up Flink on a local machine (or run a Docker container)
> There are already a few guides to get started but they are located at 
> different places and should be moved into the Tutorials section. Moreover, 
> some sections such as "Project Setup" contain content that addresses users 
> with very different intentions.
> I propose to
> * add a new Tutorials section and move all existing tutorials there (and 
> later add new ones).
> * move the "Quickstart" section to "Tutorials".
> * remove the "Project Setup" section and move the pages to other sections 
> (some pages will be split up or adjusted).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6565: [FLINK-10153] [docs] Add Tutorials section and rework structure.

2018-08-24 Thread GitBox
asfgit closed pull request #6565: [FLINK-10153] [docs] Add Tutorials section 
and rework structure.
URL: https://github.com/apache/flink/pull/6565
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md
index c4215074683..bd7ca5aff90 100644
--- a/docs/dev/api_concepts.md
+++ b/docs/dev/api_concepts.md
@@ -510,7 +510,7 @@ data.map(new MapFunction () {
 
  Java 8 Lambdas
 
-Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 
8 Guide]({{ site.baseurl }}/dev/java8.html).
+Flink also supports Java 8 Lambdas in the Java API.
 
 {% highlight java %}
 data.filter(s -> s.startsWith("http://;));
diff --git a/docs/dev/batch/examples.md b/docs/dev/batch/examples.md
index 90e372dfd60..fe2bd8d3bdc 100644
--- a/docs/dev/batch/examples.md
+++ b/docs/dev/batch/examples.md
@@ -27,8 +27,7 @@ The following example programs showcase different 
applications of Flink
 from simple word counting to graph algorithms. The code samples illustrate the
 use of [Flink's DataSet API]({{ site.baseurl }}/dev/batch/index.html).
 
-The full source code of the following and more examples can be found in the 
__flink-examples-batch__
-or __flink-examples-streaming__ module of the Flink source repository.
+The full source code of the following and more examples can be found in the {% 
gh_link flink-examples/flink-examples-batch "flink-examples-batch" %} module of 
the Flink source repository.
 
 * This will be replaced by the TOC
 {:toc}
@@ -420,102 +419,4 @@ Input files are plain text files and must be formatted as 
follows:
 - Edges are represented as pairs for vertex IDs which are separated by space 
characters. Edges are separated by new-line characters:
 * For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) links 
(1)-(2), (2)-(12), (1)-(12), and (42)-(63).
 
-## Relational Query
-
-The Relational Query example assumes two tables, one with `orders` and the 
other with `lineitems` as specified by the [TPC-H decision support 
benchmark](http://www.tpc.org/tpch/). TPC-H is a standard benchmark in the 
database industry. See below for instructions how to generate the input data.
-
-The example implements the following SQL query.
-
-{% highlight sql %}
-SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
-FROM orders, lineitem
-WHERE l_orderkey = o_orderkey
-AND o_orderstatus = "F"
-AND YEAR(o_orderdate) > 1993
-AND o_orderpriority LIKE "5%"
-GROUP BY l_orderkey, o_shippriority;
-{% endhighlight %}
-
-The Flink program, which implements the above query looks as follows.
-
-
-
-
-{% highlight java %}
-// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, 
shippriority)
-DataSet> orders = 
getOrdersDataSet(env);
-// get lineitem data set: (orderkey, extendedprice)
-DataSet> lineitems = getLineitemDataSet(env);
-
-// orders filtered by year: (orderkey, custkey)
-DataSet> ordersFilteredByYear =
-// filter orders
-orders.filter(
-new FilterFunction>() {
-@Override
-public boolean filter(Tuple5 t) {
-// status filter
-if(!t.f1.equals(STATUS_FILTER)) {
-return false;
-// year filter
-} else if(Integer.parseInt(t.f2.substring(0, 4)) <= 
YEAR_FILTER) {
-return false;
-// order priority filter
-} else if(!t.f3.startsWith(OPRIO_FILTER)) {
-return false;
-}
-return true;
-}
-})
-// project fields out that are no longer required
-.project(0,4).types(Integer.class, Integer.class);
-
-// join orders with lineitems: (orderkey, shippriority, extendedprice)
-DataSet> lineitemsOfOrders =
-ordersFilteredByYear.joinWithHuge(lineitems)
-.where(0).equalTo(0)
-.projectFirst(0,1).projectSecond(1)
-.types(Integer.class, Integer.class, Double.class);
-
-// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
-DataSet> priceSums =
-// group by order and sum extendedprice
-lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
-
-// emit result
-priceSums.writeAsCsv(outputPath);
-{% endhighlight %}
-
-The {% gh_link 
/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
 "Relational Query program" %} implements the above query. It requires the 
following parameters to run: `--orders  --lineitem  --output 
`.
-
-
-
-Coming soon...
-
-The {% gh_link 

[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-24 Thread GitBox
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212621362
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   I think the way that these commands work is a bit inconsistent with the 
`SHOW TABLES`, `SHOW FUNCTIONS`, and `DESCRIBE xxx` commands.
   
   1. Views are listed as regular tables when calling `SHOW TABLES`. 
 * List tables and views together, but mark views as views
 * Or add a `SHOW VIEWS` to only list views (and only list tables with 
`LIST TABLES`)
   
   2. The schema of a view is returned with `DESCRIBE xxx`. This is fine, but 
IMO, we should also return the query. Showing schema and query with different 
commands is not intuitive to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591932#comment-16591932
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212609696
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
+
+{% highlight text %}
+SHOW VIEW MyNewView
+{% endhighlight %}
+
+Views created within a CLI session can also be removed again using the `DROP 
VIEW` statement:
+
+{% highlight text %}
+DROP VIEW MyNewView
+{% endhighlight %}
+
+Attention The definition of views is 
limited to the mentioned semantics above. Defining a schema for views or escape 
whitespaces in table names will be supported in future versions.
 
 Review comment:
   semantics -> syntax


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591933#comment-16591933
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212693978
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ##
 @@ -390,51 +395,101 @@ private void callSelect(SqlCommandCall cmdCall) {
view.open();
 
// view left
-   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_RESULT_QUIT).toAnsi());
-   terminal.flush();
+   printInfo(CliStrings.MESSAGE_RESULT_QUIT);
} catch (SqlExecutionException e) {
-   printException(e);
+   printExecutionException(e);
}
}
 
private boolean callInsertInto(SqlCommandCall cmdCall) {
-   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
-   terminal.flush();
+   printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
 
try {
final ProgramTargetDescriptor programTarget = 
executor.executeUpdate(context, cmdCall.operands[0]);

terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
terminal.writer().println(programTarget.toString());
terminal.flush();
} catch (SqlExecutionException e) {
-   printException(e);
+   printExecutionException(e);
return false;
}
return true;
}
 
-   private void callSource(SqlCommandCall cmdCall) {
-   final String pathString = cmdCall.operands[0];
+   private void callCreateView(SqlCommandCall cmdCall) {
+   final String name = cmdCall.operands[0];
+   final String query = cmdCall.operands[1];
+
+   try {
+   // validate with a copy
+   final SessionContext contextCopy = context.copy();
 
 Review comment:
   This check fails if the VIEW query contains UDFs. 
   It seems that these are not registered yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-24 Thread GitBox
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212693978
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ##
 @@ -390,51 +395,101 @@ private void callSelect(SqlCommandCall cmdCall) {
view.open();
 
// view left
-   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_RESULT_QUIT).toAnsi());
-   terminal.flush();
+   printInfo(CliStrings.MESSAGE_RESULT_QUIT);
} catch (SqlExecutionException e) {
-   printException(e);
+   printExecutionException(e);
}
}
 
private boolean callInsertInto(SqlCommandCall cmdCall) {
-   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
-   terminal.flush();
+   printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
 
try {
final ProgramTargetDescriptor programTarget = 
executor.executeUpdate(context, cmdCall.operands[0]);

terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
terminal.writer().println(programTarget.toString());
terminal.flush();
} catch (SqlExecutionException e) {
-   printException(e);
+   printExecutionException(e);
return false;
}
return true;
}
 
-   private void callSource(SqlCommandCall cmdCall) {
-   final String pathString = cmdCall.operands[0];
+   private void callCreateView(SqlCommandCall cmdCall) {
+   final String name = cmdCall.operands[0];
+   final String query = cmdCall.operands[1];
+
+   try {
+   // validate with a copy
+   final SessionContext contextCopy = context.copy();
 
 Review comment:
   This check fails if the VIEW query contains UDFs. 
   It seems that these are not registered yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591931#comment-16591931
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212622912
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
 ##
 @@ -30,86 +35,124 @@ private SqlCommandParser() {
}
 
public static Optional parse(String stmt) {
-   String trimmed = stmt.trim();
+   // normalize
+   stmt = stmt.trim();
// remove ';' at the end because many people type it intuitively
-   if (trimmed.endsWith(";")) {
-   trimmed = trimmed.substring(0, trimmed.length() - 1);
+   if (stmt.endsWith(";")) {
+   stmt = stmt.substring(0, stmt.length() - 1).trim();
}
+
+   // parse
for (SqlCommand cmd : SqlCommand.values()) {
-   int pos = 0;
-   int tokenCount = 0;
-   for (String token : trimmed.split("\\s")) {
-   pos += token.length() + 1; // include space 
character
-   // check for content
-   if (token.length() > 0) {
-   // match
-   if (tokenCount < cmd.tokens.length && 
token.equalsIgnoreCase(cmd.tokens[tokenCount])) {
-   if (tokenCount == 
cmd.tokens.length - 1) {
-   final SqlCommandCall 
call = new SqlCommandCall(
-   cmd,
-   
splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length(
-   );
-   return 
Optional.of(call);
-   }
-   } else {
-   // next sql command
-   break;
-   }
-   tokenCount++; // check next token
+   final Pattern pattern = 
Pattern.compile(cmd.matchingRegex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
 
 Review comment:
   Pattern compilation is quite heavy. Can we do this just once and not for 
every entered command?
   
   Maybe store the compiled pattern in the enum?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591934#comment-16591934
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212621362
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
 
 Review comment:
   I think the way that these commands work is a bit inconsistent with the 
`SHOW TABLES`, `SHOW FUNCTIONS`, and `DESCRIBE xxx` commands.
   
   1. Views are listed as regular tables when calling `SHOW TABLES`. 
 * List tables and views together, but mark views as views
 * Or add a `SHOW VIEWS` to only list views (and only list tables with 
`LIST TABLES`)
   
   2. The schema of a view is returned with `DESCRIBE xxx`. This is fine, but 
IMO, we should also return the query. Showing schema and query with different 
commands is not intuitive to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591930#comment-16591930
 ] 

ASF GitHub Bot commented on FLINK-10163:


fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212654542
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ##
 @@ -67,24 +75,36 @@ public Environment() {
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if (!config.containsKey(TABLE_NAME)) {
-   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
-   }
-   final Object nameObject = config.get(TABLE_NAME);
-   if (nameObject == null || !(nameObject instanceof 
String) || ((String) nameObject).length() <= 0) {
-   throw new SqlClientException("Invalid table 
name '" + nameObject + "'.");
-   }
-   final String name = (String) nameObject;
+   final String name = extractEarlyStringProperty(config, 
TABLE_NAME, "table");
final Map properties = new 
HashMap<>(config);
properties.remove(TABLE_NAME);
 
-   if (this.tables.containsKey(name)) {
+   if (this.tables.containsKey(name) || 
this.views.containsKey(name)) {
throw new SqlClientException("Duplicate table 
name '" + name + "'.");
}
this.tables.put(name, createTableDescriptor(name, 
properties));
});
}
 
+   public Map getViews() {
+   return views;
+   }
+
+   public void setViews(List> views) {
+   // the order of how views are registered matters because
+   // they might reference each other
+   this.views = new LinkedHashMap<>(views.size());
+   views.forEach(config -> {
+   final String name = extractEarlyStringProperty(config, 
VIEW_NAME, "view");
+   final String query = extractEarlyStringProperty(config, 
VIEW_QUERY, "view");
+
+   if (this.tables.containsKey(name) || 
this.views.containsKey(name)) {
+   throw new SqlClientException("Duplicate table 
name '" + name + "'.");
 
 Review comment:
   `table name` -> `view name` (or rewrite error message to sth like "Cannot 
create view XXX because another table or view with that name is already 
registered."


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support CREATE VIEW in SQL Client
> -
>
> Key: FLINK-10163
> URL: https://issues.apache.org/jira/browse/FLINK-10163
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The possibility to define a name for a subquery would improve the usability 
> of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a 
> virtual table.
>  
> Example:
> {code}
>  CREATE VIEW viewName
>  [ '(' columnName [, columnName ]* ')' ]
>  AS Query
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-24 Thread GitBox
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212609696
 
 

 ##
 File path: docs/dev/table/sqlClient.md
 ##
 @@ -459,6 +466,50 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+-
+
+Views allow to define virtual tables from SQL queries. The view definition is 
parsed and validated immediately. However, the actual execution happens when 
the view is accessed during the submission of a general `INSERT INTO` or 
`SELECT` statement.
+
+Views can either be defined in [environment 
files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+query: >
+  SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+  FROM MyTableSource
+  WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment 
file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` 
statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+The `SHOW VIEW` statement allows for printing a previously created view again:
+
+{% highlight text %}
+SHOW VIEW MyNewView
+{% endhighlight %}
+
+Views created within a CLI session can also be removed again using the `DROP 
VIEW` statement:
+
+{% highlight text %}
+DROP VIEW MyNewView
+{% endhighlight %}
+
+Attention The definition of views is 
limited to the mentioned semantics above. Defining a schema for views or escape 
whitespaces in table names will be supported in future versions.
 
 Review comment:
   semantics -> syntax


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-24 Thread GitBox
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212622912
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
 ##
 @@ -30,86 +35,124 @@ private SqlCommandParser() {
}
 
public static Optional parse(String stmt) {
-   String trimmed = stmt.trim();
+   // normalize
+   stmt = stmt.trim();
// remove ';' at the end because many people type it intuitively
-   if (trimmed.endsWith(";")) {
-   trimmed = trimmed.substring(0, trimmed.length() - 1);
+   if (stmt.endsWith(";")) {
+   stmt = stmt.substring(0, stmt.length() - 1).trim();
}
+
+   // parse
for (SqlCommand cmd : SqlCommand.values()) {
-   int pos = 0;
-   int tokenCount = 0;
-   for (String token : trimmed.split("\\s")) {
-   pos += token.length() + 1; // include space 
character
-   // check for content
-   if (token.length() > 0) {
-   // match
-   if (tokenCount < cmd.tokens.length && 
token.equalsIgnoreCase(cmd.tokens[tokenCount])) {
-   if (tokenCount == 
cmd.tokens.length - 1) {
-   final SqlCommandCall 
call = new SqlCommandCall(
-   cmd,
-   
splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length(
-   );
-   return 
Optional.of(call);
-   }
-   } else {
-   // next sql command
-   break;
-   }
-   tokenCount++; // check next token
+   final Pattern pattern = 
Pattern.compile(cmd.matchingRegex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
 
 Review comment:
   Pattern compilation is quite heavy. Can we do this just once and not for 
every entered command?
   
   Maybe store the compiled pattern in the enum?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client

2018-08-24 Thread GitBox
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] 
Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606#discussion_r212654542
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ##
 @@ -67,24 +75,36 @@ public Environment() {
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if (!config.containsKey(TABLE_NAME)) {
-   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
-   }
-   final Object nameObject = config.get(TABLE_NAME);
-   if (nameObject == null || !(nameObject instanceof 
String) || ((String) nameObject).length() <= 0) {
-   throw new SqlClientException("Invalid table 
name '" + nameObject + "'.");
-   }
-   final String name = (String) nameObject;
+   final String name = extractEarlyStringProperty(config, 
TABLE_NAME, "table");
final Map properties = new 
HashMap<>(config);
properties.remove(TABLE_NAME);
 
-   if (this.tables.containsKey(name)) {
+   if (this.tables.containsKey(name) || 
this.views.containsKey(name)) {
throw new SqlClientException("Duplicate table 
name '" + name + "'.");
}
this.tables.put(name, createTableDescriptor(name, 
properties));
});
}
 
+   public Map getViews() {
+   return views;
+   }
+
+   public void setViews(List> views) {
+   // the order of how views are registered matters because
+   // they might reference each other
+   this.views = new LinkedHashMap<>(views.size());
+   views.forEach(config -> {
+   final String name = extractEarlyStringProperty(config, 
VIEW_NAME, "view");
+   final String query = extractEarlyStringProperty(config, 
VIEW_QUERY, "view");
+
+   if (this.tables.containsKey(name) || 
this.views.containsKey(name)) {
+   throw new SqlClientException("Duplicate table 
name '" + name + "'.");
 
 Review comment:
   `table name` -> `view name` (or rewrite error message to sth like "Cannot 
create view XXX because another table or view with that name is already 
registered."


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591852#comment-16591852
 ] 

ASF GitHub Bot commented on FLINK-10204:


benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy 
error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#issuecomment-415805492
 
 
   @zentol do you want me to squash the commits and force push to this branch 
so that the history remains clean?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy 
error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#issuecomment-415805492
 
 
   @zentol do you want me to squash the commits and force push to this branch 
so that the history remains clean?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591804#comment-16591804
 ] 

ASF GitHub Bot commented on FLINK-10204:


benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212666184
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
 
 Review comment:
   Sure, I will do that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212666184
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
 
 Review comment:
   Sure, I will do that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591783#comment-16591783
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415789873
 
 
   @hequn8128 I'm pretty sure that what you have just pointed out is bug in 
Flink SQL. I'm not 100% sure (maybe 95%), but when comparing different types 
like `CHAR(3)` with `CHAR(5)` or `VARCHAR(2)`, all of the operands should be 
first converted to same type (adding/ignoring padding when necessary) and only 
then compared.
   
   In other words `'A  ' == 'A'` should return true, and this PR is not a 
proper fix to this problem. It doesn't fix for example:
   
   ```
   SELECT * FROM country WHERE country_name = 'GERMANY  '
   ```
   
   This further convinces me that:
   > and our CHAR(x) support seems to be broken (for example 'A ' || 'B ' 
should return 'A B ')
   
   Probably either we should convert string literals to `VARCHAR` or properly 
implement support for `CHAR`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread GitBox
pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415789873
 
 
   @hequn8128 I'm pretty sure that what you have just pointed out is bug in 
Flink SQL. I'm not 100% sure (maybe 95%), but when comparing different types 
like `CHAR(3)` with `CHAR(5)` or `VARCHAR(2)`, all of the operands should be 
first converted to same type (adding/ignoring padding when necessary) and only 
then compared.
   
   In other words `'A  ' == 'A'` should return true, and this PR is not a 
proper fix to this problem. It doesn't fix for example:
   
   ```
   SELECT * FROM country WHERE country_name = 'GERMANY  '
   ```
   
   This further convinces me that:
   > and our CHAR(x) support seems to be broken (for example 'A ' || 'B ' 
should return 'A B ')
   
   Probably either we should convert string literals to `VARCHAR` or properly 
implement support for `CHAR`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9391) Support UNNEST in Table API

2018-08-24 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591754#comment-16591754
 ] 

Hequn Cheng commented on FLINK-9391:


Hi [~SokolovMS], Thanks for contributing to Flink. You are right, UNNEST is 
built on top of the user-define table function which is different from other 
scalar functions. Tests about table functions can be found in 
\{{CorrelateITCase}}, you can have a research on the implementation. If you 
have any questions feel free to ask.

> Support UNNEST in Table API
> ---
>
> Key: FLINK-9391
> URL: https://issues.apache.org/jira/browse/FLINK-9391
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Alina Ipatina
>Priority: Major
>
> FLINK-6033 introduced the UNNEST function for SQL. We should also add this 
> function to the Table API to keep the APIs in sync. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591737#comment-16591737
 ] 

ASF GitHub Bot commented on FLINK-10204:


zentol commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212644961
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
 
 Review comment:
   let's reduce this change to only replacing `operatorId != that.operatorId`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591736#comment-16591736
 ] 

ASF GitHub Bot commented on FLINK-10204:


zentol commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212645193
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
+   LatencyMarker other = (LatencyMarker) obj;
+   if (markedTime != other.markedTime) {
return false;
}
-   return subtaskIndex == that.subtaskIndex;
-
+   if (operatorId == null) {
+   if (other.operatorId != null) {
+   return false;
+   }
+   } else if (!operatorId.equals(other.operatorId)) {
+   return false;
+   }
+   if (subtaskIndex != other.subtaskIndex) {
+   return false;
+   }
+   return true;
}
 
@Override
public int hashCode() {
-   int result = (int) (markedTime ^ (markedTime >>> 32));
 
 Review comment:
   This change isn't necessary, please revert.
   
   While it is theoretically possible for operatorId to be null, in practices 
this won't happen.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with 

[GitHub] zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
zentol commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212644961
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
 
 Review comment:
   let's reduce this change to only replacing `operatorId != that.operatorId`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
zentol commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212645193
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
+   LatencyMarker other = (LatencyMarker) obj;
+   if (markedTime != other.markedTime) {
return false;
}
-   return subtaskIndex == that.subtaskIndex;
-
+   if (operatorId == null) {
+   if (other.operatorId != null) {
+   return false;
+   }
+   } else if (!operatorId.equals(other.operatorId)) {
+   return false;
+   }
+   if (subtaskIndex != other.subtaskIndex) {
+   return false;
+   }
+   return true;
}
 
@Override
public int hashCode() {
-   int result = (int) (markedTime ^ (markedTime >>> 32));
 
 Review comment:
   This change isn't necessary, please revert.
   
   While it is theoretically possible for operatorId to be null, in practices 
this won't happen.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy 
error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#issuecomment-415769746
 
 
   @StephanEwen and @zentol , I've commented on the PR comments. The reason why 
LatencyMarker has changes is because the equals implementation was incorrect. 
It did an operatorId != that.operatorId, instead of 
!operatorId.equals(that.operatorId).
   
   If you want, I can revert that change and just write a more complicated 
assertEquals statement in the test, but I feel that it is the correct change. 
The other stream records have equals correctly implemented so that they can 
compare after a copy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591707#comment-16591707
 ] 

ASF GitHub Bot commented on FLINK-10204:


benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy 
error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#issuecomment-415769746
 
 
   @StephanEwen and @zentol , I've commented on the PR comments. The reason why 
LatencyMarker has changes is because the equals implementation was incorrect. 
It did an operatorId != that.operatorId, instead of 
!operatorId.equals(that.operatorId).
   
   If you want, I can revert that change and just write a more complicated 
assertEquals statement in the test, but I feel that it is the correct change. 
The other stream records have equals correctly implemented so that they can 
compare after a copy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591702#comment-16591702
 ] 

ASF GitHub Bot commented on FLINK-10204:


benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212639070
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
 ##
 @@ -89,6 +90,9 @@ public void testSerialization() throws Exception {
 
Watermark negativeWatermark = new 
Watermark(-4647654567676555876L);
assertEquals(negativeWatermark, 
serializeAndDeserialize(negativeWatermark, serializer));
+
+   LatencyMarker latencyMarker = new 
LatencyMarker(System.currentTimeMillis(), new OperatorID(-1, -1), 1);
+   assertEquals(latencyMarker, 
serializeAndDeserialize(latencyMarker, serializer));
 
 Review comment:
   hashCode and equals changes are to support this line here. Without it, even 
though the objects have the same values, they still evaluate as not equal.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212639070
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
 ##
 @@ -89,6 +90,9 @@ public void testSerialization() throws Exception {
 
Watermark negativeWatermark = new 
Watermark(-4647654567676555876L);
assertEquals(negativeWatermark, 
serializeAndDeserialize(negativeWatermark, serializer));
+
+   LatencyMarker latencyMarker = new 
LatencyMarker(System.currentTimeMillis(), new OperatorID(-1, -1), 1);
+   assertEquals(latencyMarker, 
serializeAndDeserialize(latencyMarker, serializer));
 
 Review comment:
   hashCode and equals changes are to support this line here. Without it, even 
though the objects have the same values, they still evaluate as not equal.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591641#comment-16591641
 ] 

ASF GitHub Bot commented on FLINK-10204:


benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212626021
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
+   LatencyMarker other = (LatencyMarker) obj;
+   if (markedTime != other.markedTime) {
return false;
}
-   return subtaskIndex == that.subtaskIndex;
-
+   if (operatorId == null) {
+   if (other.operatorId != null) {
+   return false;
+   }
+   } else if (!operatorId.equals(other.operatorId)) {
+   return false;
+   }
+   if (subtaskIndex != other.subtaskIndex) {
+   return false;
+   }
+   return true;
}
 
@Override
public int hashCode() {
-   int result = (int) (markedTime ^ (markedTime >>> 32));
 
 Review comment:
   This is just the way eclipse generates the hashCode. Since operatorId can be 
null, it does a nullcheck before calling hashCode on it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.




[GitHub] benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212626021
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
+   if (this == obj) {
return true;
}
-   if (o == null || getClass() != o.getClass()){
+   if (obj == null) {
return false;
}
-
-   LatencyMarker that = (LatencyMarker) o;
-
-   if (markedTime != that.markedTime) {
+   if (getClass() != obj.getClass()) {
return false;
}
-   if (operatorId != that.operatorId) {
+   LatencyMarker other = (LatencyMarker) obj;
+   if (markedTime != other.markedTime) {
return false;
}
-   return subtaskIndex == that.subtaskIndex;
-
+   if (operatorId == null) {
+   if (other.operatorId != null) {
+   return false;
+   }
+   } else if (!operatorId.equals(other.operatorId)) {
+   return false;
+   }
+   if (subtaskIndex != other.subtaskIndex) {
+   return false;
+   }
+   return true;
}
 
@Override
public int hashCode() {
-   int result = (int) (markedTime ^ (markedTime >>> 32));
 
 Review comment:
   This is just the way eclipse generates the hashCode. Since operatorId can be 
null, it does a nullcheck before calling hashCode on it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591638#comment-16591638
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208833159
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala
 ##
 @@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlCastFunction
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.functions.TableVersionFunction
+import org.apache.flink.table.functions.sql.ProctimeSqlFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.Preconditions.checkState
+
+class LogicalCorrelateToVersionedJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToVersionedJoinRule") {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val logicalCorrelate: LogicalCorrelate = call.rel(0)
+val leftNode: RelNode = call.rel(1)
+val rightTableFunctionScan: TableFunctionScan = call.rel(2)
+
+val cluster = logicalCorrelate.getCluster
+
+new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode)
+  .visit(rightTableFunctionScan.getCall) match {
+  case Some(rightVersionedTableCall) =>
+val underlyingTableHistory: Table = 
rightVersionedTableCall.tableVersionFunction.table
+val relBuilder = this.relBuilderFactory.create(
+  cluster,
+  underlyingTableHistory.relBuilder.getRelOptSchema)
+val rexBuilder = cluster.getRexBuilder
+
+val rightNode: RelNode = 
underlyingTableHistory.logicalPlan.toRelNode(relBuilder)
+
+val rightVersionExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.versionField)
+
+val rightPrimaryKeyExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.primaryKey)
+
+relBuilder.push(
+  if (rightVersionedTableCall.tableVersionFunction.isProctime) {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightPrimaryKeyExpression)
+  }
+  else {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightVersionExpression,
+  rightPrimaryKeyExpression)
+  })
+call.transformTo(relBuilder.build())
+  case None =>
+}
+  }
+
+  private def createRightExpression(
+  rexBuilder: RexBuilder,
+  leftNode: RelNode,
+  rightNode: RelNode,
+  field: String): RexNode = {
+val rightReferencesOffset = 

[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591639#comment-16591639
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r212625803
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/VersionedJoin.scala
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class VersionedJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  protected var rightState: ValueState[Row] = _
+  protected var cRowWrapper: CRowWrappingCollector = _
+
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+
+joinFunction = clazz.newInstance()
+
+val rightStateDescriptor = new ValueStateDescriptor[Row]("right", 
rightType)
 
 Review comment:
   Could we do it as a follow up task to minimize PR size?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-24 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r212625803
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/VersionedJoin.scala
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class VersionedJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  protected var rightState: ValueState[Row] = _
+  protected var cRowWrapper: CRowWrappingCollector = _
+
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+
+joinFunction = clazz.newInstance()
+
+val rightStateDescriptor = new ValueStateDescriptor[Row]("right", 
rightType)
 
 Review comment:
   Could we do it as a follow up task to minimize PR size?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-24 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208833159
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala
 ##
 @@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlCastFunction
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.functions.TableVersionFunction
+import org.apache.flink.table.functions.sql.ProctimeSqlFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.Preconditions.checkState
+
+class LogicalCorrelateToVersionedJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToVersionedJoinRule") {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val logicalCorrelate: LogicalCorrelate = call.rel(0)
+val leftNode: RelNode = call.rel(1)
+val rightTableFunctionScan: TableFunctionScan = call.rel(2)
+
+val cluster = logicalCorrelate.getCluster
+
+new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode)
+  .visit(rightTableFunctionScan.getCall) match {
+  case Some(rightVersionedTableCall) =>
+val underlyingTableHistory: Table = 
rightVersionedTableCall.tableVersionFunction.table
+val relBuilder = this.relBuilderFactory.create(
+  cluster,
+  underlyingTableHistory.relBuilder.getRelOptSchema)
+val rexBuilder = cluster.getRexBuilder
+
+val rightNode: RelNode = 
underlyingTableHistory.logicalPlan.toRelNode(relBuilder)
+
+val rightVersionExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.versionField)
+
+val rightPrimaryKeyExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.primaryKey)
+
+relBuilder.push(
+  if (rightVersionedTableCall.tableVersionFunction.isProctime) {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightPrimaryKeyExpression)
+  }
+  else {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightVersionExpression,
+  rightPrimaryKeyExpression)
+  })
+call.transformTo(relBuilder.build())
+  case None =>
+}
+  }
+
+  private def createRightExpression(
+  rexBuilder: RexBuilder,
+  leftNode: RelNode,
+  rightNode: RelNode,
+  field: String): RexNode = {
+val rightReferencesOffset = leftNode.getRowType.getFieldCount
+val rightDataTypeField = rightNode.getRowType.getField(field, false, false)
+rexBuilder.makeInputRef(
+  rightDataTypeField.getType, rightReferencesOffset + 
rightDataTypeField.getIndex)
+  }
+}
+

[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591630#comment-16591630
 ] 

ASF GitHub Bot commented on FLINK-10204:


benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212623923
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
 
 Review comment:
   Yes, the prior implementation did this:
   `if (operatorId != that.operatorId) {`
   instead of calling equals(), which resulted in a false if the object is 
equivalent, but not the exact same object (for example, when the bytestream is 
copied).
   
   These changes are just from eclipse generating the equals and hashCode 
methods. I needed to do this so that the unit test would actually pass since it 
uses the equals method in it's assertion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-24 Thread GitBox
benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610#discussion_r212623923
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
 ##
 @@ -67,31 +67,40 @@ public int getSubtaskIndex() {
// 

 
@Override
-   public boolean equals(Object o) {
-   if (this == o) {
+   public boolean equals(Object obj) {
 
 Review comment:
   Yes, the prior implementation did this:
   `if (operatorId != that.operatorId) {`
   instead of calling equals(), which resulted in a false if the object is 
equivalent, but not the exact same object (for example, when the bytestream is 
copied).
   
   These changes are just from eclipse generating the equals and hashCode 
methods. I needed to do this so that the unit test would actually pass since it 
uses the equals method in it's assertion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591597#comment-16591597
 ] 

ASF GitHub Bot commented on FLINK-10136:


xccui commented on a change in pull request #6597: [FLINK-10136] [table] Add 
REPEAT supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6597#discussion_r212618421
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -2614,6 +2626,18 @@ STRING.rtrim()
 E.g., 'This is a test String. '.rtrim() returns "This 
is a test String.".
   
 
+
+
+  
+{% highlight java %}
+STRING.repeat(INT)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base STRING INT 
times. 
+E.g., "This is a test String. ".repeat(2) returns 
"This is a test String.This is a test String.".
 
 Review comment:
   There's an additional space after the full stop. (I've fixed it in my 
branch, so just let it be there.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add REPEAT supported in Table API and SQL
> -
>
> Key: FLINK-10136
> URL: https://issues.apache.org/jira/browse/FLINK-10136
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> Oracle : 
> [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat]
> MySql: 
> https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on a change in pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL

2018-08-24 Thread GitBox
xccui commented on a change in pull request #6597: [FLINK-10136] [table] Add 
REPEAT supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6597#discussion_r212618421
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -2614,6 +2626,18 @@ STRING.rtrim()
 E.g., 'This is a test String. '.rtrim() returns "This 
is a test String.".
   
 
+
+
+  
+{% highlight java %}
+STRING.repeat(INT)
+{% endhighlight %}
+  
+  
+Returns a string that repeats the base STRING INT 
times. 
+E.g., "This is a test String. ".repeat(2) returns 
"This is a test String.This is a test String.".
 
 Review comment:
   There's an additional space after the full stop. (I've fixed it in my 
branch, so just let it be there.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10211) Time indicators are not always materialised for LogicalJoin

2018-08-24 Thread Xingcan Cui (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591587#comment-16591587
 ] 

Xingcan Cui commented on FLINK-10211:
-

Hi [~pnowojski], I guess the two tickets you just filed are related to 
FLINK-8897, right?

> Time indicators are not always materialised for LogicalJoin
> ---
>
> Key: FLINK-10211
> URL: https://issues.apache.org/jira/browse/FLINK-10211
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Currently 
> {{org.apache.flink.table.calcite.RelTimeIndicatorConverter#visit(LogicalJoin)}}
>  correctly handles only windowed joins. Output of non windowed joins 
> shouldn't contain any time indicators.
> CC [~twalthr]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591576#comment-16591576
 ] 

ASF GitHub Bot commented on FLINK-9970:
---

yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function 
for table/sql API
URL: https://github.com/apache/flink/pull/6432#issuecomment-415745275
 
 
   cc @xccui rebased this PR please review again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ASCII/CHR function for table/sql API
> 
>
> Key: FLINK-9970
> URL: https://issues.apache.org/jira/browse/FLINK-9970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> for ASCII function : 
> refer to : 
> [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii]
> for CHR function : 
> This function convert ASCII code to a character,
> refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html]
> Considering "CHAR" always is a keyword in many database, so we use "CHR" 
> keyword.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API

2018-08-24 Thread GitBox
yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function 
for table/sql API
URL: https://github.com/apache/flink/pull/6432#issuecomment-415745275
 
 
   cc @xccui rebased this PR please review again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10211) Time indicators are not always materialised for LogicalJoin

2018-08-24 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-10211:
--

 Summary: Time indicators are not always materialised for 
LogicalJoin
 Key: FLINK-10211
 URL: https://issues.apache.org/jira/browse/FLINK-10211
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0
Reporter: Piotr Nowojski


Currently 
{{org.apache.flink.table.calcite.RelTimeIndicatorConverter#visit(LogicalJoin)}} 
correctly handles only windowed joins. Output of non windowed joins shouldn't 
contain any time indicators.

CC [~twalthr]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10210) Time indicators are not always materialised for LogicalCorrelate

2018-08-24 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-10210:
--

 Summary: Time indicators are not always materialised for 
LogicalCorrelate
 Key: FLINK-10210
 URL: https://issues.apache.org/jira/browse/FLINK-10210
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0
Reporter: Piotr Nowojski


Currenty 
{{org.apache.flink.table.calcite.RelTimeIndicatorConverter#visit(LogicalCorrelate)}}
 supports only the cases if right side is {{LogicalTableFunctionScan}}. That is 
not always the case. For example in case of 
{{org.apache.flink.table.api.stream.table.CorrelateTest#testFilter}} the 
LogicalFilter node is being pushed down to the right side of 
{{LogicalCorrelate}}.

 

CC [~twalthr]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-10205:
-

Assignee: JIN SUN

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591538#comment-16591538
 ] 

ASF GitHub Bot commented on FLINK-3875:
---

twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table 
sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#issuecomment-415737635
 
 
   CC @tzulitai @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a TableSink for Elasticsearch
> -
>
> Key: FLINK-3875
> URL: https://issues.apache.org/jira/browse/FLINK-3875
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors, Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Add a TableSink that writes data to Elasticsearch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch

2018-08-24 Thread GitBox
twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table 
sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#issuecomment-415737635
 
 
   CC @tzulitai @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591537#comment-16591537
 ] 

ASF GitHub Bot commented on FLINK-3875:
---

twalthr opened a new pull request #6611: [FLINK-3875] [connectors] Add an 
upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611
 
 
   ## What is the purpose of the change
   
   This PR adds full support for Elasticsearch to be used with Table & SQL API 
as well as SQL Client.
   
   ## Brief change log
   
   This PR includes:
 - Elasticsearch 6 upsert table sink
 - Elasticsearch 6 table factory
 - Elasticsearch table descriptors & validators
 - Unit tests
 - SQL Client end-to-end test
 - Website documentation
   
   ## Verifying this change
   
   - Unit tests for descriptors, factory, and sink
   - SQL Client end-to-end test extended with Elasticsearch sink
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  yes
 - If yes, how is the feature documented? docs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a TableSink for Elasticsearch
> -
>
> Key: FLINK-3875
> URL: https://issues.apache.org/jira/browse/FLINK-3875
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors, Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Add a TableSink that writes data to Elasticsearch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-3875) Add a TableSink for Elasticsearch

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-3875:
--
Labels: pull-request-available  (was: )

> Add a TableSink for Elasticsearch
> -
>
> Key: FLINK-3875
> URL: https://issues.apache.org/jira/browse/FLINK-3875
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors, Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Add a TableSink that writes data to Elasticsearch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr opened a new pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch

2018-08-24 Thread GitBox
twalthr opened a new pull request #6611: [FLINK-3875] [connectors] Add an 
upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611
 
 
   ## What is the purpose of the change
   
   This PR adds full support for Elasticsearch to be used with Table & SQL API 
as well as SQL Client.
   
   ## Brief change log
   
   This PR includes:
 - Elasticsearch 6 upsert table sink
 - Elasticsearch 6 table factory
 - Elasticsearch table descriptors & validators
 - Unit tests
 - SQL Client end-to-end test
 - Website documentation
   
   ## Verifying this change
   
   - Unit tests for descriptors, factory, and sink
   - SQL Client end-to-end test extended with Elasticsearch sink
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  yes
 - If yes, how is the feature documented? docs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591535#comment-16591535
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a 
union of CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866
 
 
   Hi @pnowojski , thanks for your reply. There are many cases need this 
feature. Not only `case when`, but also `nvl`, `greatest` and `least`.  Most 
users encounter the blank problem is `case when`. Examples have been added in 
the test cases. Below I will add some more examples:
   1. 
   ```
   SELECT country_name
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   )
   WHERE country_name = 'GERMANY'
   ```
   This sql will output nothing since the blank problem. It is very confused.
   
   2. 
   ```
   SELECT country_name, country_info 
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   ) nameTable join infoTable on nameTable.country_name = 
infoTable.country_name;
   ```
   This sql cannot join correctly since the blank problem. 'GERMANY' in 
nameTable becomes 'GERMANY---'.  BTW, '-' means the blank.
   
   It is true the sql standard returns CHAR type, but nearly all major DBMSs 
return VARCHAR without blank-padded. A tool for easy online testing: 
http://sqlfiddle.com/
   
   Thanks, Hequn
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread GitBox
hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a 
union of CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866
 
 
   Hi @pnowojski , thanks for your reply. There are many cases need this 
feature. Not only `case when`, but also `nvl`, `greatest` and `least`.  Most 
users encounter the blank problem is `case when`. Examples have been added in 
the test cases. Below I will add some more examples:
   1. 
   ```
   SELECT country_name
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   )
   WHERE country_name = 'GERMANY'
   ```
   This sql will output nothing since the blank problem. It is very confused.
   
   2. 
   ```
   SELECT country_name, country_info 
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   ) nameTable join infoTable on nameTable.country_name = 
infoTable.country_name;
   ```
   This sql cannot join correctly since the blank problem. 'GERMANY' in 
nameTable becomes 'GERMANY---'.  BTW, '-' means the blank.
   
   It is true the sql standard returns CHAR type, but nearly all major DBMSs 
return VARCHAR without blank-padded. A tool for easy online testing: 
http://sqlfiddle.com/
   
   Thanks, Hequn
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591533#comment-16591533
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a 
union of CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866
 
 
   Hi @pnowojski , thanks for your reply. There are many cases need this 
feature. Not only `case when`, but also `nvl`, `greatest` and `least`.  Most 
users encounter the blank problem is `case when`. Examples have been added in 
the test cases. Below I will add some more examples:
   1. 
   ```
   SELECT country_name
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   )
   WHERE country_name = 'GERMANY'
   ```
   This sql will output nothing since the blank problem. It is very confused.
   
   2. 
   ```
   SELECT country_name, country_info 
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   ) nameTable join infoTable on nameTable.country_name = 
infoTable.country_name;
   ```
   This sql cannot join correctly since the blank problem. 'GERMANY' in 
nameTable becomes 'GERMANY---'.  BTW, '-' means the blank.
   
   It is true the sql standard returns CHAR type, but nearly all major DBMSs 
return VARCHAR without blank-padded.
   
   Thanks, Hequn
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread GitBox
hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a 
union of CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866
 
 
   Hi @pnowojski , thanks for your reply. There are many cases need this 
feature. Not only `case when`, but also `nvl`, `greatest` and `least`.  Most 
users encounter the blank problem is `case when`. Examples have been added in 
the test cases. Below I will add some more examples:
   1. 
   ```
   SELECT country_name
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   )
   WHERE country_name = 'GERMANY'
   ```
   This sql will output nothing since the blank problem. It is very confused.
   
   2. 
   ```
   SELECT country_name, country_info 
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   ) nameTable join infoTable on nameTable.country_name = 
infoTable.country_name;
   ```
   This sql cannot join correctly since the blank problem. 'GERMANY' in 
nameTable becomes 'GERMANY---'.  BTW, '-' means the blank.
   
   It is true the sql standard returns CHAR type, but nearly all major DBMSs 
return VARCHAR without blank-padded.
   
   Thanks, Hequn
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread GitBox
hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866
 
 
   Hi @pnowojski , thanks for your reply. There are many cases need this 
feature. Not only `case when`, but also `nvl`, `greatest` and `least`.  Most 
users encounter the blank problem is `case when`. Examples have been added in 
the test cases. Below I will add some more examples:
   1. 
   ```
   SELECT country_name
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   )
   WHERE country_name = 'GERMANY'
   ```
   This sql will output nothing since the blank problem. It is very confused.
   
   2. 
   ```
   SELECT country_name, country_info 
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   ) nameTable join infoTable on nameTable.country_name = 
infoTable.country_name;
   ```
   This sql cannot join correctly since the blank problem. 'GERMANY' in 
nameTable becomes 'GERMANY   '.
   
   It is true the sql standard returns CHAR type, but nearly all major DBMSs 
return VARCHAR without blank-padded.
   
   Thanks, Hequn
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591530#comment-16591530
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866
 
 
   Hi @pnowojski , thanks for your reply. There are many cases need this 
feature. Not only `case when`, but also `nvl`, `greatest` and `least`.  Most 
users encounter the blank problem is `case when`. Examples have been added in 
the test cases. Below I will add some more examples:
   1. 
   ```
   SELECT country_name
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   )
   WHERE country_name = 'GERMANY'
   ```
   This sql will output nothing since the blank problem. It is very confused.
   
   2. 
   ```
   SELECT country_name, country_info 
   FROM (
SELECT CASE id
WHEN 1 THEN 'GERMANY'
WHEN 2 THEN 'CANADA'
ELSE 'INVALID COUNTRY ID'
END AS country_name
FROM country_id
   ) nameTable join infoTable on nameTable.country_name = 
infoTable.country_name;
   ```
   This sql cannot join correctly since the blank problem. 'GERMANY' in 
nameTable becomes 'GERMANY   '.
   
   It is true the sql standard returns CHAR type, but nearly all major DBMSs 
return VARCHAR without blank-padded.
   
   Thanks, Hequn
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-24 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591511#comment-16591511
 ] 

eugen yushin edited comment on FLINK-10050 at 8/24/18 11:31 AM:


[~aljoscha] There's no info about windows for any of consecutive operator is 
retained in Flink. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results]
{code:java}
 The result of a windowed operation is again a {{DataStream}}, no information 
about the windowed operations is retained in the result elements
{code}
At the same time, coGroup/join keeps element's timestamps and consecutive 
operators can assign elements to respective windows. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join]
{code:java}
 Those elements that do get joined will have as their timestamp the largest 
timestamp that still lies in the respective window. For example a window with 
{{[5, 10)}} as its boundaries would result in the joined elements having 9 as 
their timestamp.
{code}
Business case: 2 streams, 1 for different business metrics, another one - 
similar metrics but from microservices logs, result - reconciliation of these 2 
streams. No other operators except sink are need for this particular business 
case.


was (Author: eyushin):
[~aljoscha] There's no info about windows for any consecutive of operator in 
Flink. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results]
{code:java}
 The result of a windowed operation is again a {{DataStream}}, no information 
about the windowed operations is retained in the result elements
{code}
At the same time, coGroup/join keeps element's timestamps and consecutive 
operators can assign elements to respective windows. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join]
{code:java}
 Those elements that do get joined will have as their timestamp the largest 
timestamp that still lies in the respective window. For example a window with 
{{[5, 10)}} as its boundaries would result in the joined elements having 9 as 
their timestamp.
{code}
Business case: 2 streams, 1 for different business metrics, another one - 
similar metrics but from microservices logs, result - reconciliation of these 2 
streams. No other operators except sink are need for this particular business 
case.

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-24 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591511#comment-16591511
 ] 

eugen yushin edited comment on FLINK-10050 at 8/24/18 11:30 AM:


[~aljoscha] There's no info about windows for any consecutive of operator in 
Flink. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results]
{code:java}
 The result of a windowed operation is again a {{DataStream}}, no information 
about the windowed operations is retained in the result elements
{code}
At the same time, coGroup/join keeps element's timestamps and consecutive 
operators can assign elements to respective windows. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join]
{code:java}
 Those elements that do get joined will have as their timestamp the largest 
timestamp that still lies in the respective window. For example a window with 
{{[5, 10)}} as its boundaries would result in the joined elements having 9 as 
their timestamp.
{code}
Business case: 2 streams, 1 for different business metrics, another one - 
similar metrics but from microservices logs, result - reconciliation of these 2 
streams. No other operators except sink are need for this particular business 
case.


was (Author: eyushin):
[~aljoscha] There's no info about windows for any of operator in Flink. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results]
{code}
 The result of a windowed operation is again a {{DataStream}}, no information 
about the windowed operations is retained in the result elements
{code}

At the same time, coGroup/join keeps element's timestamps and consecutive 
operators can assign elements to respective windows. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join]
{code}
 Those elements that do get joined will have as their timestamp the largest 
timestamp that still lies in the respective window. For example a window with 
{{[5, 10)}} as its boundaries would result in the joined elements having 9 as 
their timestamp.
{code}

Business case: 2 streams, 1 for different business metrics, another one - 
similar metrics but from microservices logs, result - reconciliation of these 2 
streams. No other operators except sink are need for this particular business 
case.

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-24 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591511#comment-16591511
 ] 

eugen yushin edited comment on FLINK-10050 at 8/24/18 11:29 AM:


[~aljoscha] There's no info about windows for any of operator in Flink. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results]
{code}
 The result of a windowed operation is again a {{DataStream}}, no information 
about the windowed operations is retained in the result elements
{code}

At the same time, coGroup/join keeps element's timestamps and consecutive 
operators can assign elements to respective windows. Docs:
 
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join]
{code}
 Those elements that do get joined will have as their timestamp the largest 
timestamp that still lies in the respective window. For example a window with 
{{[5, 10)}} as its boundaries would result in the joined elements having 9 as 
their timestamp.
{code}

Business case: 2 streams, 1 for different business metrics, another one - 
similar metrics but from microservices logs, result - reconciliation of these 2 
streams. No other operators except sink are need for this particular business 
case.


was (Author: eyushin):
[~aljoscha] There's no info about windows for any of operator in Flink. Docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results
```
The result of a windowed operation is again a {{DataStream}}, no information 
about the windowed operations is retained in the result elements
```

At the same time, coGroup/join keeps element's timestamps and consecutive 
operators can assign elements to respective windows. Docs:
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join]
```
Those elements that do get joined will have as their timestamp the largest 
timestamp that still lies in the respective window. For example a window with 
{{[5, 10)}} as its boundaries would result in the joined elements having 9 as 
their timestamp.
```

Business case: 2 streams, 1 for different business metrics, another one - 
similar metrics but from microservices logs, result - reconciliation of these 2 
streams. No other operators except sink are need for this particular business 
case.

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-24 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591511#comment-16591511
 ] 

eugen yushin commented on FLINK-10050:
--

[~aljoscha] There's no info about windows for any of operator in Flink. Docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results
```
The result of a windowed operation is again a {{DataStream}}, no information 
about the windowed operations is retained in the result elements
```

At the same time, coGroup/join keeps element's timestamps and consecutive 
operators can assign elements to respective windows. Docs:
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join]
```
Those elements that do get joined will have as their timestamp the largest 
timestamp that still lies in the respective window. For example a window with 
{{[5, 10)}} as its boundaries would result in the joined elements having 9 as 
their timestamp.
```

Business case: 2 streams, 1 for different business metrics, another one - 
similar metrics but from microservices logs, result - reconciliation of these 2 
streams. No other operators except sink are need for this particular business 
case.

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591513#comment-16591513
 ] 

ASF GitHub Bot commented on FLINK-7964:
---

yanghua commented on a change in pull request #6577: [FLINK-7964] Add Apache 
Kafka 1.0/1.1 connectors
URL: https://github.com/apache/flink/pull/6577#discussion_r212599001
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java
 ##
 @@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * IT cases for Kafka 1.0 .
+ */
+public class Kafka10ITCase extends KafkaConsumerTestBase {
 
 Review comment:
   @pnowojski I tried to extend from Kafka011ITCase : 
   
   ```java
   public class Kafka10ITCase extends Kafka011ITCase
   ```
   
   but the prepare method : 
   
   ```java
   @BeforeClass
   public static void prepare() throws ClassNotFoundException {
   KafkaProducerTestBase.prepare();//here
   ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
   }
   ```
   will trigger the method call chain : 
   
   ```java
   protected static void startClusters(boolean secureMode, boolean 
hideKafkaBehindProxy) throws ClassNotFoundException {
   
   // dynamically load the implementation for the test
   //here will load KafkaTestEnvironmentImpl from connector 0.11, not 1.0
   Class clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");

   kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
   
   LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
   ```
   
   the `startClusters` is a static class, the `clazz` is the instance of 
KafkaTestEnvironmentImpl from 0.11 module, and we can not use polymorphism to 
get a instance of KafkaTestEnvironmentImpl from 1.0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Apache Kafka 1.0/1.1 connectors
> ---
>
> Key: FLINK-7964
> URL: https://issues.apache.org/jira/browse/FLINK-7964
> 

[GitHub] yanghua commented on a change in pull request #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors

2018-08-24 Thread GitBox
yanghua commented on a change in pull request #6577: [FLINK-7964] Add Apache 
Kafka 1.0/1.1 connectors
URL: https://github.com/apache/flink/pull/6577#discussion_r212599001
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java
 ##
 @@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * IT cases for Kafka 1.0 .
+ */
+public class Kafka10ITCase extends KafkaConsumerTestBase {
 
 Review comment:
   @pnowojski I tried to extend from Kafka011ITCase : 
   
   ```java
   public class Kafka10ITCase extends Kafka011ITCase
   ```
   
   but the prepare method : 
   
   ```java
   @BeforeClass
   public static void prepare() throws ClassNotFoundException {
   KafkaProducerTestBase.prepare();//here
   ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
   }
   ```
   will trigger the method call chain : 
   
   ```java
   protected static void startClusters(boolean secureMode, boolean 
hideKafkaBehindProxy) throws ClassNotFoundException {
   
   // dynamically load the implementation for the test
   //here will load KafkaTestEnvironmentImpl from connector 0.11, not 1.0
   Class clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");

   kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
   
   LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
   ```
   
   the `startClusters` is a static class, the `clazz` is the instance of 
KafkaTestEnvironmentImpl from 0.11 module, and we can not use polymorphism to 
get a instance of KafkaTestEnvironmentImpl from 1.0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591489#comment-16591489
 ] 

JIN SUN commented on FLINK-10205:
-

In my opinion we need deterministic, especially when a task rerun, the output 
should as same as the previous version. Today's logic might skip the split that 
previous task processed and assign a different split, this will lead incorrect 
result.

Fabian, we didn't restrict the InputSplit assignment, instead, by add a small 
piece of code in Execution.java and ExecutionVertex.java, we can make it 
deterministic.

i'm preparing the code, we can have a further discussion when it ready. 

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591484#comment-16591484
 ] 

Fabian Hueske commented on FLINK-10205:
---

Oh, I didn't notice that this issue was created in the context of FLINK-4256.

Please disregard my previous comment.

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-24 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591472#comment-16591472
 ] 

JIN SUN commented on FLINK-10205:
-

Thanks Vinoyang,

I cannot assign the bug to myself, I've send a email to Aljoscha Krettek ask 
for permission. 

Jin




> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tragicjun edited a comment on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs

2018-08-24 Thread GitBox
tragicjun edited a comment on issue #6508: [Flink-10079] [table] Automatically 
register sink table from external catalogs 
URL: https://github.com/apache/flink/pull/6508#issuecomment-415720615
 
 
   CC @zentol @xccui  would you please help review it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >