[jira] [Updated] (FLINK-12406) Report BLOCKING_PERSISTENT result partition meta back to client

2019-06-17 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12406:
---
Description: 
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}

 

Description
- The client uses {{ExecutionEnvironment}} to submit a batch job and wait for 
the {{JobResult}} from {{JM}}
- When a job finishes, {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations 
will be collected from each {{ExecutionVertex}} in {{ExecutionGraph}}
- On JM side, the {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations flow in 
this path ({{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}})
- On client side, A {{JobExecutionResult}} will be created with the returned 
{{JobResult}}
- The {{ExecutionEnvironment}} sees the {{JobExecutionResult}} containing the 
locations, and stores them in itself

Failure Handling
- If any error occurs during collecting locations of {{BLOCKING_PERSISTENT}} 
{{ResultPartition}}, 
we do not terminate the process but leave incomplete locations of some 
{{IntermediateDataSet}}, we keep record of these IntermediateDataSetIDs and 
report back to client 
- So the Client can use these informations and decide what to do, generally
the data can be read if locations are complete, or a delete request will be 
proposed(in later PRs) if the locations are incomplete

Brief change log
 - Add a new class {{ResultPartitionDescriptor}}, which stores location of a 
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
{{ResultPartition}} in {{TaskManager}}.
 - Add a new class {{BlockingPersistentResultPartitionMeta}}, which contains 
all mappings from {{IntermediateDataSetID}} to its {{BLOCKING_PERSISTENT}} 
{{ResultPartition}} locations
 - Add a new method {{getBlockingPersistentResultPartitionMeta()}} in 
{{AccessExecutionGraph}}, which returns a 
{{BlockingPersistentResultPartitionMeta}}
 - Add an instance of {{BlockingPersistentResultPartitionMeta}} in 
{{JobExecutionResult}}, {{JobResult}}, {{ArchivedExecutionGraph}} and 
{{ExecutionEnvironment}}
 - When a job finishes, the locations will flow in this path: 
{{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> 
{{JobExecutionResult}} -> {{ExecutionEnvironment}}

  was:
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}

 

Brief Changes:
 - Add a new class {{IntermediateResultDescriptor}}, which stores location of a 
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
{{ResultPartition}} in {{TaskManager}}.
 - Add a new method {{getResultPartitionDescriptors()}} in 
{{AccessExecutionGraph}}
 - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
{{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a mapping 
from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
 - When a job finishes, the metadata will flow in this path: 
{{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> 
{{JobExecutionResult}} -> {{ExecutionEnvironment}}


> Report BLOCKING_PERSISTENT result partition meta back to client
> ---
>
> Key: FLINK-12406
> URL: https://issues.apache.org/jira/browse/FLINK-12406
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet, Runtime / Coordination
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions 
> are generated, and locations of these result partitions should be report back 
> to client via {{JobExecutionResult}}, they will be later used for Table 
> {{cache()}} and {{invalidateCache()}}
>  
> Description
> - The client uses {{ExecutionEnvironment}} to submit a batch job and wait for 
> the {{JobResult}} from {{JM}}
> - When a job finishes, {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations 
> will be collected from each {{ExecutionVertex}} in {{ExecutionGraph}}
> - On JM side, the {{BLOCKING_PERSISTENT}} {{ResultPartition}} locations flow 
> in this path ({{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> 
> {{JobResult}})
> - On client side, A {{JobExecutionResult}} will be created with the returned 
> {{JobResult}}
> - The {{ExecutionEnvironment}} sees the {{JobExecutionResult}} containing the 
> locations, and stores them in itself
> Failure Handling

[jira] [Updated] (FLINK-12406) Report BLOCKING_PERSISTENT result partition meta back to client

2019-06-17 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12406:
---
Description: 
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}

 

Brief Changes:
 - Add a new class {{IntermediateResultDescriptor}}, which stores location of a 
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
{{ResultPartition}} in {{TaskManager}}.
 - Add a new method {{getResultPartitionDescriptors()}} in 
{{AccessExecutionGraph}}
 - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
{{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a mapping 
from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
 - When a job finishes, the metadata will flow in this path: 
{{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> 
{{JobExecutionResult}} -> {{ExecutionEnvironment}}

  was:
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}

 

Brief Changes:
 - Add a new class {{IntermediateResultDescriptor}}, which stores location of a 
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
{{ResultPartition}} in {{TaskManager}}.
 - Add a new method {{getResultPartitionDescriptors()}} in 
{{AccessExecutionGraph}}
 - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
{{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a mapping 
from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
 - When a job finishes, the metadata will flow in this path: {{ExecutionGraph}} 
-> {{ArchivedExecutionGraph}} -> JobResult -> {{JobExecutionResult}}  -> 
{{ExecutionEnvironment}}


> Report BLOCKING_PERSISTENT result partition meta back to client
> ---
>
> Key: FLINK-12406
> URL: https://issues.apache.org/jira/browse/FLINK-12406
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet, Runtime / Coordination
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions 
> are generated, and locations of these result partitions should be report back 
> to client via {{JobExecutionResult}}, they will be later used for Table 
> {{cache()}} and {{invalidateCache()}}
>  
> Brief Changes:
>  - Add a new class {{IntermediateResultDescriptor}}, which stores location of 
> a {{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
> {{ResultPartition}} in {{TaskManager}}.
>  - Add a new method {{getResultPartitionDescriptors()}} in 
> {{AccessExecutionGraph}}
>  - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
> {{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a 
> mapping from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
>  - When a job finishes, the metadata will flow in this path: 
> {{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobResult}} -> 
> {{JobExecutionResult}} -> {{ExecutionEnvironment}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12406) Report BLOCKING_PERSISTENT result partition meta back to client

2019-06-17 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12406:
---
Description: 
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}

 

Brief Changes:
 - Add a new class {{IntermediateResultDescriptor}}, which stores location of a 
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
{{ResultPartition}} in {{TaskManager}}.
 - Add a new method {{getResultPartitionDescriptors()}} in 
{{AccessExecutionGraph}}
 - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
{{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a mapping 
from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
 - When a job finishes, the metadata will flow in this path: {{ExecutionGraph}} 
-> {{ArchivedExecutionGraph}} -> JobResult -> {{JobExecutionResult}}  -> 
{{ExecutionEnvironment}}

  was:
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}

 

Brief Changes:

- Add a new class {{IntermediateResultDescriptor}}, which stores location of a 
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
{{ResultPartition}} in {{TaskManager}}.
 - Add a new method {{getResultPartitionDescriptors()}} in 
{{AccessExecutionGraph}}
 - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
{{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a mapping 
from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
 - When a job finishes, the metadata will flow in this path: {{ExecutionGraph}} 
-> {{ArchivedExecutionGraph}} -> {{JobExecutionResult}} -> {{JobResult}} -> 
{{ExecutionEnvironment}}


> Report BLOCKING_PERSISTENT result partition meta back to client
> ---
>
> Key: FLINK-12406
> URL: https://issues.apache.org/jira/browse/FLINK-12406
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet, Runtime / Coordination
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions 
> are generated, and locations of these result partitions should be report back 
> to client via {{JobExecutionResult}}, they will be later used for Table 
> {{cache()}} and {{invalidateCache()}}
>  
> Brief Changes:
>  - Add a new class {{IntermediateResultDescriptor}}, which stores location of 
> a {{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
> {{ResultPartition}} in {{TaskManager}}.
>  - Add a new method {{getResultPartitionDescriptors()}} in 
> {{AccessExecutionGraph}}
>  - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
> {{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a 
> mapping from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
>  - When a job finishes, the metadata will flow in this path: 
> {{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> JobResult -> 
> {{JobExecutionResult}}  -> {{ExecutionEnvironment}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12406) Report BLOCKING_PERSISTENT result partition meta back to client

2019-06-17 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12406:
---
Description: 
After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}

 

Brief Changes:

- Add a new class {{IntermediateResultDescriptor}}, which stores location of a 
{{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
{{ResultPartition}} in {{TaskManager}}.
 - Add a new method {{getResultPartitionDescriptors()}} in 
{{AccessExecutionGraph}}
 - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
{{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a mapping 
from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
 - When a job finishes, the metadata will flow in this path: {{ExecutionGraph}} 
-> {{ArchivedExecutionGraph}} -> {{JobExecutionResult}} -> {{JobResult}} -> 
{{ExecutionEnvironment}}

  was:After each job finishes, the new {{BLOCKING_PERSISTENT}} result 
partitions are generated, and locations of these result partitions should be 
report back to client via {{JobExecutionResult}}, they will be later used for 
Table {{cache()}} and {{invalidateCache()}}


> Report BLOCKING_PERSISTENT result partition meta back to client
> ---
>
> Key: FLINK-12406
> URL: https://issues.apache.org/jira/browse/FLINK-12406
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet, Runtime / Coordination
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions 
> are generated, and locations of these result partitions should be report back 
> to client via {{JobExecutionResult}}, they will be later used for Table 
> {{cache()}} and {{invalidateCache()}}
>  
> Brief Changes:
> - Add a new class {{IntermediateResultDescriptor}}, which stores location of 
> a {{BLOCKING_PERSISTENT}} {{ResultPartition}}, currently we only support 
> {{ResultPartition}} in {{TaskManager}}.
>  - Add a new method {{getResultPartitionDescriptors()}} in 
> {{AccessExecutionGraph}}
>  - Add a new filed in {{JobExecutionResult}}, {{JobResult}}, 
> {{ArchivedExecutionGraph}} and {{ExecutionEnvironment}}, which keeps a 
> mapping from {{IntermediateDataSetID}} to its {{ResultPartition}} locations
>  - When a job finishes, the metadata will flow in this path: 
> {{ExecutionGraph}} -> {{ArchivedExecutionGraph}} -> {{JobExecutionResult}} -> 
> {{JobResult}} -> {{ExecutionEnvironment}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12405) Introduce BLOCKING_PERSISTENT result partition type

2019-05-17 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12405:
---
Description: 
The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
{{BLOCKING}} except it might be consumed for several times and will be released 
after TM shutdown or {{ResultPartition}} removal request.
 This is the basis for Interactive Programming.

Here is the brief changes:
 * Introduce {{ResultPartitionType}} called {{BLOCKING_PERSISTENT}}
 * Introduce {{BlockingShuffleOutputFormat}} which contains a user specified 
{{IntermediateDataSetID}}(passed from TableAPI in later PR)
 * when {{JobGraphGenerator}} sees a {{GenericDataSinkBase}} with 
{{BlockingShuffleOutputFormat}}, it creates a {{IntermediateDataSet}} with this 
id, then add it to its predecessor, the {{OutputFormatVertex}} for this 
{{GenericDataSinkBase}} will be excluded in {{JobGraph}}
 * So the JobGraph may contains some JobVertex which has more 
{{IntermediateDataSet}} than its downstream consumers.

Here are some design notes:
 * Why modify {{DataSet}} and {{JobGraphGenerator}}
 Since Blink Planner is not ready yet, and Batch Table is running on Flink 
Planner(based on DataSet).
 There will be another implementation once Blink Planner is ready.
 * Why use a special {{OutputFormat}} as placeholder
 We could add a {{cache()}} method for DataSet, but we do not want to change 
DataSet API any more. so a special {{OutputFormat}} as placeholder seems 
reasonable.

  was:
The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
{{BLOCKING}} except it might be consumed for several times and will be released 
after TM shutdown or {{ResultPartition}} removal request.
 This is the basis for Interactive Programming.


 Here is the brief changes:

* Introduce {{ResultPartitionType}} called {{BLOCKING_PERSISTENT}}
* Introduce {{BlockingShuffleOutputFormat}} and {{BlockingShuffleDataSink}} 
which are used to generate {{GenericDataSinkBase}} with user specified 
{{IntermediateDataSetID}} (passed from TableAPI in later PR)
* when {{JobGraphGenerator}} sees a {{GenericDataSinkBase}} with 
{{IntermediateDataSetID}}, it creates a {{IntermediateDataSet}} with this id, 
then {{OutputFormatVertex}} for this {{GenericDataSinkBase}} will be excluded 
in {{JobGraph}}
* So the JobGraph may contains some JobVertex which has more 
{{IntermediateDataSet}} than its downstream consumers.

Here are some design notes:
 * Why modify {{DataSet}} and {{JobGraphGenerator}}
 Since Blink Planner is not ready yet, and Batch Table is running on Flink 
Planner(based on DataSet).
 There will be another implementation once Blink Planner is ready.
 * Why use a special {{OutputFormat}} as placeholder
 We could add a {{cache()}} method for DataSet, but we do not want to change 
DataSet API any more. so a special {{OutputFormat}} as placeholder seems 
reasonable.


> Introduce BLOCKING_PERSISTENT result partition type
> ---
>
> Key: FLINK-12405
> URL: https://issues.apache.org/jira/browse/FLINK-12405
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
> {{BLOCKING}} except it might be consumed for several times and will be 
> released after TM shutdown or {{ResultPartition}} removal request.
>  This is the basis for Interactive Programming.
> Here is the brief changes:
>  * Introduce {{ResultPartitionType}} called {{BLOCKING_PERSISTENT}}
>  * Introduce {{BlockingShuffleOutputFormat}} which contains a user specified 
> {{IntermediateDataSetID}}(passed from TableAPI in later PR)
>  * when {{JobGraphGenerator}} sees a {{GenericDataSinkBase}} with 
> {{BlockingShuffleOutputFormat}}, it creates a {{IntermediateDataSet}} with 
> this id, then add it to its predecessor, the {{OutputFormatVertex}} for this 
> {{GenericDataSinkBase}} will be excluded in {{JobGraph}}
>  * So the JobGraph may contains some JobVertex which has more 
> {{IntermediateDataSet}} than its downstream consumers.
> Here are some design notes:
>  * Why modify {{DataSet}} and {{JobGraphGenerator}}
>  Since Blink Planner is not ready yet, and Batch Table is running on Flink 
> Planner(based on DataSet).
>  There will be another implementation once Blink Planner is ready.
>  * Why use a special {{OutputFormat}} as placeholder
>  We could add a {{cache()}} method for DataSet, but we do not want to change 
> DataSet API any more. so a special {{OutputFormat}} as placeholder seems 
> reasonable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12405) Introduce BLOCKING_PERSISTENT result partition type

2019-05-13 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12405:
---
Description: 
The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
{{BLOCKING}} except it might be consumed for several times and will be released 
after TM shutdown or {{ResultPartition}} removal request.
 This is the basis for Interactive Programming.


 Here is the brief changes:

* Introduce {{ResultPartitionType}} called {{BLOCKING_PERSISTENT}}
* Introduce {{BlockingShuffleOutputFormat}} and {{BlockingShuffleDataSink}} 
which are used to generate {{GenericDataSinkBase}} with user specified 
{{IntermediateDataSetID}} (passed from TableAPI in later PR)
* when {{JobGraphGenerator}} sees a {{GenericDataSinkBase}} with 
{{IntermediateDataSetID}}, it creates a {{IntermediateDataSet}} with this id, 
then {{OutputFormatVertex}} for this {{GenericDataSinkBase}} will be excluded 
in {{JobGraph}}
* So the JobGraph may contains some JobVertex which has more 
{{IntermediateDataSet}} than its downstream consumers.

Here are some design notes:
 * Why modify {{DataSet}} and {{JobGraphGenerator}}
 Since Blink Planner is not ready yet, and Batch Table is running on Flink 
Planner(based on DataSet).
 There will be another implementation once Blink Planner is ready.
 * Why use a special {{OutputFormat}} as placeholder
 We could add a {{cache()}} method for DataSet, but we do not want to change 
DataSet API any more. so a special {{OutputFormat}} as placeholder seems 
reasonable.

  was:
The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
{{BLOCKING}} except it might be consumed for several times and will be released 
after TM shutdown or {{ResultPartition}} removal request.
This is the basis for Interactive Programming.


> Introduce BLOCKING_PERSISTENT result partition type
> ---
>
> Key: FLINK-12405
> URL: https://issues.apache.org/jira/browse/FLINK-12405
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
> {{BLOCKING}} except it might be consumed for several times and will be 
> released after TM shutdown or {{ResultPartition}} removal request.
>  This is the basis for Interactive Programming.
>  Here is the brief changes:
> * Introduce {{ResultPartitionType}} called {{BLOCKING_PERSISTENT}}
> * Introduce {{BlockingShuffleOutputFormat}} and {{BlockingShuffleDataSink}} 
> which are used to generate {{GenericDataSinkBase}} with user specified 
> {{IntermediateDataSetID}} (passed from TableAPI in later PR)
> * when {{JobGraphGenerator}} sees a {{GenericDataSinkBase}} with 
> {{IntermediateDataSetID}}, it creates a {{IntermediateDataSet}} with this id, 
> then {{OutputFormatVertex}} for this {{GenericDataSinkBase}} will be excluded 
> in {{JobGraph}}
> * So the JobGraph may contains some JobVertex which has more 
> {{IntermediateDataSet}} than its downstream consumers.
> Here are some design notes:
>  * Why modify {{DataSet}} and {{JobGraphGenerator}}
>  Since Blink Planner is not ready yet, and Batch Table is running on Flink 
> Planner(based on DataSet).
>  There will be another implementation once Blink Planner is ready.
>  * Why use a special {{OutputFormat}} as placeholder
>  We could add a {{cache()}} method for DataSet, but we do not want to change 
> DataSet API any more. so a special {{OutputFormat}} as placeholder seems 
> reasonable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12406) Report BLOCKING_PERSISTENT result partition meta back to client

2019-05-07 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12406:
---
Component/s: API / DataSet

> Report BLOCKING_PERSISTENT result partition meta back to client
> ---
>
> Key: FLINK-12406
> URL: https://issues.apache.org/jira/browse/FLINK-12406
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet, Runtime / Coordination
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions 
> are generated, and locations of these result partitions should be report back 
> to client via {{JobExecutionResult}}, they will be later used for Table 
> {{cache()}} and {{invalidateCache()}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12420) Add support of cache/invalidateCache for TableAPI

2019-05-06 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12420:
---
Component/s: Table SQL / API

> Add support of cache/invalidateCache for TableAPI
> -
>
> Key: FLINK-12420
> URL: https://issues.apache.org/jira/browse/FLINK-12420
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> Add cache/invalidateCache api and its implementations for Table



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12420) Add support of cache/invalidateCache for TableAPI

2019-05-06 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-12420:
--

 Summary: Add support of cache/invalidateCache for TableAPI
 Key: FLINK-12420
 URL: https://issues.apache.org/jira/browse/FLINK-12420
 Project: Flink
  Issue Type: Sub-task
Reporter: Ruidong Li
Assignee: Ruidong Li


Add cache/invalidateCache api and its implementations for Table



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12419) Add support for consuming BLOCKING_PERSISTENT ResultPartition

2019-05-06 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-12419:
--

 Summary: Add support for consuming BLOCKING_PERSISTENT 
ResultPartition
 Key: FLINK-12419
 URL: https://issues.apache.org/jira/browse/FLINK-12419
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Ruidong Li
Assignee: Ruidong Li


Add support for deploying Jobs which can consume BLOCKING_PERSISTENT 
ResultPartition generated by previous Jobs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12406) Report BLOCKING_PERSISTENT result partition meta back to client

2019-05-05 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-12406:
--

 Summary: Report BLOCKING_PERSISTENT result partition meta back to 
client
 Key: FLINK-12406
 URL: https://issues.apache.org/jira/browse/FLINK-12406
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Ruidong Li
Assignee: Ruidong Li


After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions are 
generated, and locations of these result partitions should be report back to 
client via {{JobExecutionResult}}, they will be later used for Table 
{{cache()}} and {{invalidateCache()}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12405) Introduce BLOCKING_PERSISTENT result partition type

2019-05-05 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-12405:
--

 Summary: Introduce BLOCKING_PERSISTENT result partition type
 Key: FLINK-12405
 URL: https://issues.apache.org/jira/browse/FLINK-12405
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataSet
Reporter: Ruidong Li
Assignee: Ruidong Li


The new {{ResultPartitionType}} : {{BLOCKING_PERSISTENT}} is similar to 
{{BLOCKING}} except it might be consumed for several times and will be released 
after TM shutdown or {{ResultPartition}} removal request.
This is the basis for Interactive Programming.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12000) Introduce TableServiceManager to TableEnvironment

2019-03-24 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-12000:
--

 Summary: Introduce TableServiceManager to TableEnvironment
 Key: FLINK-12000
 URL: https://issues.apache.org/jira/browse/FLINK-12000
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Ruidong Li
Assignee: Ruidong Li






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11999) Introduce cache and invalidateCache API to Table

2019-03-24 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-11999:
--

 Summary: Introduce cache and invalidateCache API to Table
 Key: FLINK-11999
 URL: https://issues.apache.org/jira/browse/FLINK-11999
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Ruidong Li
Assignee: Ruidong Li






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11998) Flink Interactive Programming (Umbrella JIRA)

2019-03-24 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-11998:
--

 Summary: Flink Interactive Programming (Umbrella JIRA)
 Key: FLINK-11998
 URL: https://issues.apache.org/jira/browse/FLINK-11998
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
Reporter: Ruidong Li
Assignee: Ruidong Li


This is the Umbrella JIRA for 
[FLIP-36|https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6441) Improve the UDTF

2018-12-02 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706587#comment-16706587
 ] 

Ruidong Li commented on FLINK-6441:
---

Thanks for [~twalthr] and [~hequn8128], This issue has not been fixed. I'll fix 
it these days.

> Improve the UDTF
> 
>
> Key: FLINK-6441
> URL: https://issues.apache.org/jira/browse/FLINK-6441
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> According to [FLINK-6334], UDTF's apply method return a unbounded Table which 
> consists of a LogicalTableFunctionCall, and only supported Alias 
> transformation, this issue is focus on adding evaluating in Select, e.g 
> table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 'b))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8921) Split code generated call expression

2018-10-24 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-8921:
-

Assignee: xueyu  (was: Ruidong Li)

> Split code generated call expression 
> -
>
> Key: FLINK-8921
> URL: https://issues.apache.org/jira/browse/FLINK-8921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: xueyu
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-8274 we introduced the possibility of splitting the generated code 
> into multiple methods in order to exceed the JVMs maximum method size (see 
> also https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.9).
> At the moment we only split methods by fields, however, this is not enough in 
> all case. We should also split expressions. I suggest to split the operands 
> of a {{RexCall}} in 
> {{org.apache.flink.table.codegen.CodeGenerator#visitCall}} if we reach a 
> certain threshold. However, this should happen as lazily as possible to keep 
> the runtime overhead low.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8921) Split code generated call expression

2018-10-24 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662436#comment-16662436
 ] 

Ruidong Li commented on FLINK-8921:
---

[~xueyu] thanks for you contribution, I assign this issue to you if you want 
fix this.

> Split code generated call expression 
> -
>
> Key: FLINK-8921
> URL: https://issues.apache.org/jira/browse/FLINK-8921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-8274 we introduced the possibility of splitting the generated code 
> into multiple methods in order to exceed the JVMs maximum method size (see 
> also https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.9).
> At the moment we only split methods by fields, however, this is not enough in 
> all case. We should also split expressions. I suggest to split the operands 
> of a {{RexCall}} in 
> {{org.apache.flink.table.codegen.CodeGenerator#visitCall}} if we reach a 
> certain threshold. However, this should happen as lazily as possible to keep 
> the runtime overhead low.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10670) Fix Correlate codegen error

2018-10-24 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-10670:
--

 Summary: Fix Correlate codegen error
 Key: FLINK-10670
 URL: https://issues.apache.org/jira/browse/FLINK-10670
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


TableFunctionCollector should handle reuseInitCode and reuseMemberCode



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10451) TableFunctionCollector should handle the life cycle of ScalarFunction

2018-09-27 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-10451:
--

 Summary: TableFunctionCollector should handle the life cycle of 
ScalarFunction
 Key: FLINK-10451
 URL: https://issues.apache.org/jira/browse/FLINK-10451
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


Considering the following query:

table.join(udtf('a)).where(udf('b))

the filter will be pushed into DataSetCorrelate/DataStreamCorrelate without 
triggering open() and close()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8868) Support Table Function as Table for Stream Sql

2018-08-20 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586971#comment-16586971
 ] 

Ruidong Li commented on FLINK-8868:
---

[~hequn8128] I've tried that SQL without 'Lateral' still works. I'll update 
this issue. Best.

> Support Table Function as Table for Stream Sql
> --
>
> Key: FLINK-8868
> URL: https://issues.apache.org/jira/browse/FLINK-8868
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> for stream sql:
> support SQL like:  SELECT * FROM Lateral TABLE(tf("a"))
> for batch sql:
> udtf might produce infinite recors, it need to be discussed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8868) Support Table Function as Table for Stream Sql

2018-08-20 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-8868:
--
Description: 
for stream sql:
support SQL like:  SELECT * FROM TABLE(tf("a"))
for batch sql:
udtf might produce infinite recors, it need to be discussed

  was:
for stream sql:
support SQL like:  SELECT * FROM Lateral TABLE(tf("a"))
for batch sql:
udtf might produce infinite recors, it need to be discussed


> Support Table Function as Table for Stream Sql
> --
>
> Key: FLINK-8868
> URL: https://issues.apache.org/jira/browse/FLINK-8868
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> for stream sql:
> support SQL like:  SELECT * FROM TABLE(tf("a"))
> for batch sql:
> udtf might produce infinite recors, it need to be discussed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9716) Support scans from table version function

2018-08-19 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585380#comment-16585380
 ] 

Ruidong Li edited comment on FLINK-9716 at 8/20/18 3:25 AM:


https://issues.apache.org/jira/browse/FLINK-8868
does the Table Function scan works?


was (Author: ruidongli):
https://issues.apache.org/jira/browse/FLINK-8868
does the Table Function scan suits?

> Support scans from table version function
> -
>
> Key: FLINK-9716
> URL: https://issues.apache.org/jira/browse/FLINK-9716
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Ruidong Li
>Priority: Major
>
> Given TVF of {{Rates}} this should work:
>  
> {code:java}
> SELECT * FROM Rates(2016-06-27 10:10:42.123)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9716) Support scans from table version function

2018-08-19 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585380#comment-16585380
 ] 

Ruidong Li commented on FLINK-9716:
---

https://issues.apache.org/jira/browse/FLINK-8868
does the Table Function scan suits?

> Support scans from table version function
> -
>
> Key: FLINK-9716
> URL: https://issues.apache.org/jira/browse/FLINK-9716
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Ruidong Li
>Priority: Major
>
> Given TVF of {{Rates}} this should work:
>  
> {code:java}
> SELECT * FROM Rates(2016-06-27 10:10:42.123)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9716) Support scans from table version function

2018-08-19 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-9716:
-

Assignee: Ruidong Li

> Support scans from table version function
> -
>
> Key: FLINK-9716
> URL: https://issues.apache.org/jira/browse/FLINK-9716
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Ruidong Li
>Priority: Major
>
> Given TVF of {{Rates}} this should work:
>  
> {code:java}
> SELECT * FROM Rates(2016-06-27 10:10:42.123)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8868) Support Table Function as Table for Stream Sql

2018-08-17 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-8868:
--
Description: 
for stream sql:
support SQL like:  SELECT * FROM Lateral TABLE(tf("a"))
for batch sql:
udtf might produce infinite recors, it need to be discussed

  was:support SQL like:  SELECT * FROM TABLE(tf("a"))


> Support Table Function as Table for Stream Sql
> --
>
> Key: FLINK-8868
> URL: https://issues.apache.org/jira/browse/FLINK-8868
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> for stream sql:
> support SQL like:  SELECT * FROM Lateral TABLE(tf("a"))
> for batch sql:
> udtf might produce infinite recors, it need to be discussed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8868) Support Table Function as Table for Stream Sql

2018-08-16 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-8868:
--
Summary: Support Table Function as Table for Stream Sql  (was: Support 
Table Function as Table for stream sql)

> Support Table Function as Table for Stream Sql
> --
>
> Key: FLINK-8868
> URL: https://issues.apache.org/jira/browse/FLINK-8868
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> support SQL like:  SELECT * FROM TABLE(tf("a"))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8868) Support Table Function as Table for stream sql

2018-08-16 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-8868:
--
Summary: Support Table Function as Table for stream sql  (was: Support 
Table Function as Table)

> Support Table Function as Table for stream sql
> --
>
> Key: FLINK-8868
> URL: https://issues.apache.org/jira/browse/FLINK-8868
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> support SQL like:  SELECT * FROM TABLE(tf("a"))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9433) SystemProcessingTimeService does not work properly

2018-08-09 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574704#comment-16574704
 ] 

Ruidong Li edited comment on FLINK-9433 at 8/9/18 12:07 PM:


How about adding a clear Thread with  a DelayQueue for AsyncWaitOperator?. 
[~StephanEwen]


was (Author: ruidongli):
How about add a clear thread with Java DelayQueue for AsyncWaitOperator. 
[~StephanEwen]

> SystemProcessingTimeService does not work properly
> --
>
> Key: FLINK-9433
> URL: https://issues.apache.org/jira/browse/FLINK-9433
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Critical
> Attachments: log.txt
>
>
> if  (WindowOperator --> AsyncWaitOperator) chained together, when the queue 
> of AsyncWaitOperator is full and timeTrigger of WindowOperator is triggered 
> to call collect(), it will wait until the queue of AsyncWaitOperator  is not 
> full, at the moment, the timeTrigger of AsyncWaitOperator will not be 
> triggered because the SystemProcessingTimeService has only one capacity.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9433) SystemProcessingTimeService does not work properly

2018-08-09 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574704#comment-16574704
 ] 

Ruidong Li commented on FLINK-9433:
---

How about add a clear thread with Java DelayQueue for AsyncWaitOperator. 
[~StephanEwen]

> SystemProcessingTimeService does not work properly
> --
>
> Key: FLINK-9433
> URL: https://issues.apache.org/jira/browse/FLINK-9433
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Critical
> Attachments: log.txt
>
>
> if  (WindowOperator --> AsyncWaitOperator) chained together, when the queue 
> of AsyncWaitOperator is full and timeTrigger of WindowOperator is triggered 
> to call collect(), it will wait until the queue of AsyncWaitOperator  is not 
> full, at the moment, the timeTrigger of AsyncWaitOperator will not be 
> triggered because the SystemProcessingTimeService has only one capacity.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10108) DATE_FORMAT function in sql test throws a NumberFormatException

2018-08-09 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-10108:
--

Assignee: Ruidong Li

> DATE_FORMAT function in sql test throws a NumberFormatException
> ---
>
> Key: FLINK-10108
> URL: https://issues.apache.org/jira/browse/FLINK-10108
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Assignee: Ruidong Li
>Priority: Minor
>
> {{testSqlApi("DATE_FORMAT(TIMESTAMP '1991-01-02 03:04:06', '%m/%d/%Y')", 
> "01/02/1991")}} will throw a {{NumberFormatException}}, whereas the function 
> works fine in {{testAllApis()}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9997) Improve Expression Reduce

2018-08-07 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571818#comment-16571818
 ] 

Ruidong Li commented on FLINK-9997:
---

Adding the Calc normalization to the logical optimization rules will cause a 
long-playing optimization,I'm trying to resolve it. 

> Improve Expression Reduce
> -
>
> Key: FLINK-9997
> URL: https://issues.apache.org/jira/browse/FLINK-9997
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> There's no Calc node at normalize plans, so ReduceExpressionRule.CALC does 
> not match any thing.
> If adding calc-related rules to normalize phase, then project-related rules 
> and filter-related rules do not match any thing at logical opt phase. If 
> adding ReduceExpressionRule.CALC to logical opt phase, it will increase the 
> the search time. Therefore, adding a new phase after logical opt may be an 
> option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9997) Improve Expression Reduce

2018-08-03 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-9997:
--
Description: 
There's no Calc node at normalize plans, so ReduceExpressionRule.CALC does not 
match any thing.
If adding calc-related rules to normalize phase, then project-related rules and 
filter-related rules do not match any thing at logical opt phase. If adding 
ReduceExpressionRule.CALC to logical opt phase, it will increase the the search 
time. Therefore, adding a new phase after logical opt may be an option.

  was:RepressionReduce do not reduce some expressions.


> Improve Expression Reduce
> -
>
> Key: FLINK-9997
> URL: https://issues.apache.org/jira/browse/FLINK-9997
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> There's no Calc node at normalize plans, so ReduceExpressionRule.CALC does 
> not match any thing.
> If adding calc-related rules to normalize phase, then project-related rules 
> and filter-related rules do not match any thing at logical opt phase. If 
> adding ReduceExpressionRule.CALC to logical opt phase, it will increase the 
> the search time. Therefore, adding a new phase after logical opt may be an 
> option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9963) Add a single value table format factory

2018-08-01 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-9963:
--
Description: 
Sometimes it might be useful to just read or write a single value into Kafka or 
other connectors. We should add a single-value {{SerializationSchemaFactory}} 
and single-value {{DeserializationSchemaFactory}}, the types below and their 
array types shall be considered.

{{byte, short, int, long, float, double, string}}

For the numeric types, we might want to specify the endian format.
A {{string}} type single-value format will be added with this issue for future 
reference.

  was:
Sometimes it might be useful to just read or write a single value into Kafka or 
other connectors. We should add a single-value {{SerializationSchemaFactory}} 
and single-value {{DeserializationSchemaFactory}}, the types below and their 
array types shall be considered.

{{byte, short, int, long, float, double, string}}

For the numeric types, we might want to specify the endian format.


> Add a single value table format factory
> ---
>
> Key: FLINK-9963
> URL: https://issues.apache.org/jira/browse/FLINK-9963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> Sometimes it might be useful to just read or write a single value into Kafka 
> or other connectors. We should add a single-value 
> {{SerializationSchemaFactory}} and single-value 
> {{DeserializationSchemaFactory}}, the types below and their array types shall 
> be considered.
> {{byte, short, int, long, float, double, string}}
> For the numeric types, we might want to specify the endian format.
> A {{string}} type single-value format will be added with this issue for 
> future reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9963) Add a single value table format factory

2018-08-01 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-9963:
--
Description: 
Sometimes it might be useful to just read or write a single value into Kafka or 
other connectors. We should add a single-value {{SerializationSchemaFactory}} 
and single-value {{DeserializationSchemaFactory}}, the types below and their 
array types shall be considered.

{{byte, short, int, long, float, double, string}}

For the numeric types, we might want to specify the endian format.

  was:
Sometimes it might be useful to just read or write a string into Kafka or other 
connectors. We should add a simple string {{SerializationSchemaFactory}} and 
{{DeserializationSchemaFactory}}. How we want to represent all data types and 
nested types is still up for discussion. We could also support just a single 
string field?

Schema derivation should be supported by the factories.

See {{org.apache.flink.formats.json.JsonRowFormatFactory}} for a reference.


> Add a single value table format factory
> ---
>
> Key: FLINK-9963
> URL: https://issues.apache.org/jira/browse/FLINK-9963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> Sometimes it might be useful to just read or write a single value into Kafka 
> or other connectors. We should add a single-value 
> {{SerializationSchemaFactory}} and single-value 
> {{DeserializationSchemaFactory}}, the types below and their array types shall 
> be considered.
> {{byte, short, int, long, float, double, string}}
> For the numeric types, we might want to specify the endian format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9963) Add a single value table format factory

2018-08-01 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-9963:
--
Summary: Add a single value table format factory  (was: Add a string table 
format factory)

> Add a single value table format factory
> ---
>
> Key: FLINK-9963
> URL: https://issues.apache.org/jira/browse/FLINK-9963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> Sometimes it might be useful to just read or write a string into Kafka or 
> other connectors. We should add a simple string 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> We could also support just a single string field?
> Schema derivation should be supported by the factories.
> See {{org.apache.flink.formats.json.JsonRowFormatFactory}} for a reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10008) Improve the LOG function in Table to support bases less than 1

2018-07-31 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-10008:
--

Assignee: Ruidong Li

> Improve the LOG function in Table to support bases less than 1
> --
>
> Key: FLINK-10008
> URL: https://issues.apache.org/jira/browse/FLINK-10008
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Assignee: Ruidong Li
>Priority: Major
>
> Currently, the {{LOG}} function in SQL/Table API restricts the base to be 
> greater than 1. We should extend it to support bases less than 1 (e.g., 
> {{LOG(0.1, 0.01)}} should return 2.0).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10009) Fix the casting problem for function TIMESTAMPADD in Table

2018-07-31 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-10009:
--

Assignee: Ruidong Li

> Fix the casting problem for function TIMESTAMPADD in Table
> --
>
> Key: FLINK-10009
> URL: https://issues.apache.org/jira/browse/FLINK-10009
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Assignee: Ruidong Li
>Priority: Major
>
> There seems to be a bug in {{TIMESTAMPADD}} function. For example, 
> {{TIMESTAMPADD(MINUTE, 1, DATE '2016-06-15')}} throws a 
> {{ClassCastException}} ( java.lang.Integer cannot be cast to java.lang.Long). 
> Actually, it tries to cast an integer date to a long timestamp in 
> RexBuilder.java:1524 - {{return TimestampString.fromMillisSinceEpoch((Long) 
> o)}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9905) ClassCastException: org.apache.flink.streaming.runtime.streamrecord.LatencyMarker

2018-07-31 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16563218#comment-16563218
 ] 

Ruidong Li commented on FLINK-9905:
---

hi [~gfee-lyft], can you give me the program of your job?

> ClassCastException: 
> org.apache.flink.streaming.runtime.streamrecord.LatencyMarker
> -
>
> Key: FLINK-9905
> URL: https://issues.apache.org/jira/browse/FLINK-9905
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: Gregory Fee
>Priority: Minor
>
> The example program feeds from a couple of data sources into a SQL 
> transformation which then sinks out via a GRPC call. I'm not sure what 
> context is relevant but I can provide additional context as necessary. The 
> stack trace below is what is reported in the Flink UI as the exception.
>  
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> at 
> com.lyft.streamingplatform.BetterBuffer$OutputThread.run(BetterBuffer.java:316)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at 
> com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:67)
> at 
> com.lyft.dsp.functions.Labeler$UnlabelerFunction.processElement(Labeler.java:48)
> at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> ... 5 more
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
> at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
> at DataStreamSourceConversion$14.processElement(Unknown Source)
> at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:67)
> at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> ... 14 more
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.streaming.runtime.streamrecord.LatencyMarker cannot be cast 
> to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
> at 
> 

[jira] [Updated] (FLINK-9997) Improve Expression Reduce

2018-07-30 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-9997:
--
Summary: Improve Expression Reduce  (was: Improve ExpressionReduce)

> Improve Expression Reduce
> -
>
> Key: FLINK-9997
> URL: https://issues.apache.org/jira/browse/FLINK-9997
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> RepressionReduce do not reduce some expressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9997) Improve ExpressionReduce

2018-07-30 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-9997:
-

 Summary: Improve ExpressionReduce
 Key: FLINK-9997
 URL: https://issues.apache.org/jira/browse/FLINK-9997
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


RepressionReduce do not reduce some expressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9984) Add a byte array table format factory

2018-07-28 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-9984:
-

Assignee: Ruidong Li

> Add a byte array table format factory
> -
>
> Key: FLINK-9984
> URL: https://issues.apache.org/jira/browse/FLINK-9984
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>
> Sometimes it might be useful to just read or write a plain byte array into 
> Kafka or other connectors. We should add a simple byte array 
> SerializationSchemaFactory and DeserializationSchemaFactory.
> See {{org.apache.flink.formats.json.JsonRowFormatFactory}} for a reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9965) Support schema derivation in Avro table format factory

2018-07-26 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-9965:
-

Assignee: Ruidong Li

> Support schema derivation in Avro table format factory
> --
>
> Key: FLINK-9965
> URL: https://issues.apache.org/jira/browse/FLINK-9965
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>
> Currently, only the {{org.apache.flink.formats.json.JsonRowFormatFactory}} is 
> able to use the information provided by the table schema to derive the JSON 
> format schema. Avro should support this as well in order to avoid specifying 
> a schema twice. This requires the inverse operation that 
> {{org.apache.flink.formats.avro.typeutils.AvroSchemaConverter}} does. Instead 
> of avroschema-to-typeinfo we need a typeinfo-to-avroschema converter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9963) Add a string table format factory

2018-07-26 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-9963:
-

Assignee: Ruidong Li

> Add a string table format factory
> -
>
> Key: FLINK-9963
> URL: https://issues.apache.org/jira/browse/FLINK-9963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>
> Sometimes it might be useful to just read or write a string into Kafka or 
> other connectors. We should add a simple string 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> We could also support just a single string field?
> Schema derivation should be supported by the factories.
> See {{org.apache.flink.formats.json.JsonRowFormatFactory}} for a reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9958) Fix potential NPE for delta iteration of DataSet

2018-07-25 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-9958:
-

 Summary: Fix potential NPE for delta iteration of DataSet
 Key: FLINK-9958
 URL: https://issues.apache.org/jira/browse/FLINK-9958
 Project: Flink
  Issue Type: Bug
  Components: DataSet API
Reporter: Ruidong Li
Assignee: Ruidong Li






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9422) Dedicated operator for UNION on streaming tables with time attributes

2018-05-29 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493560#comment-16493560
 ] 

Ruidong Li commented on FLINK-9422:
---

[~fhueske], I'm working on the issue. I have some questions about the first 
implementation. 
1. How does  a user specify the implementation? It may works with rowtime but 
the user want to materialize all distinct values.
2. What if the user want  to hold the data for his preferred time? may be 
longer than two watermarks.

> Dedicated operator for UNION on streaming tables with time attributes
> -
>
> Key: FLINK-9422
> URL: https://issues.apache.org/jira/browse/FLINK-9422
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Ruidong Li
>Priority: Minor
>
> We can implement a dedicated operator for a {{UNION}} operator on tables with 
> time attributes. Currently, {{UNION}} is translated into a {{UNION ALL}} and 
> a subsequent {{GROUP BY}} on all attributes without aggregation functions. 
> The state of the grouping operator is only clean up using state retention 
> timers. 
> The dedicated operator would leverage the monotonicity property of the time 
> attribute and watermarks to automatically clean up its state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9344) Support INTERSECT and INTERSECT ALL for streaming

2018-05-25 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-9344:
--
Description: support non-window intersect and non-window intersect all for 
both SQL and TableAPI  (was: support intersect and intersect all for both SQL 
and TableAPI)

> Support INTERSECT and INTERSECT ALL for streaming
> -
>
> Key: FLINK-9344
> URL: https://issues.apache.org/jira/browse/FLINK-9344
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> support non-window intersect and non-window intersect all for both SQL and 
> TableAPI



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9433) SystemProcessingTimeService does not work properly

2018-05-25 Thread Ruidong Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490484#comment-16490484
 ] 

Ruidong Li commented on FLINK-9433:
---

One thread per task will not work, I think it should be better if operator has 
its own time service.

> SystemProcessingTimeService does not work properly
> --
>
> Key: FLINK-9433
> URL: https://issues.apache.org/jira/browse/FLINK-9433
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Critical
> Attachments: log.txt
>
>
> if  (WindowOperator --> AsyncWaitOperator) chained together, when the queue 
> of AsyncWaitOperator is full and timeTrigger of WindowOperator is triggered 
> to call collect(), it will wait until the queue of AsyncWaitOperator  is not 
> full, at the moment, the timeTrigger of AsyncWaitOperator will not be 
> triggered because the SystemProcessingTimeService has only one capacity.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9433) SystemProcessingTimeService does not work properly

2018-05-24 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-9433:
-

 Summary: SystemProcessingTimeService does not work properly
 Key: FLINK-9433
 URL: https://issues.apache.org/jira/browse/FLINK-9433
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Ruidong Li
Assignee: Ruidong Li


if  (WindowOperator --> AsyncWaitOperator) chained together, when the queue of 
AsyncWaitOperator is full and timeTrigger of WindowOperator is triggered to 
call collect(), it will wait until the queue of AsyncWaitOperator  is not full, 
at the moment, the timeTrigger of AsyncWaitOperator will not be triggered 
because the SystemProcessingTimeService has only one capacity.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9420) Add tests for SQL IN sub-query operator in streaming

2018-05-23 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-9420:
-

Assignee: Ruidong Li

> Add tests for SQL IN sub-query operator in streaming
> 
>
> Key: FLINK-9420
> URL: https://issues.apache.org/jira/browse/FLINK-9420
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>
> In FLINK-6094 we implemented non-windowed inner joins. The Table API & SQL 
> should now  support the {{IN}} operator for sub-queries in streaming. Batch 
> support has been added in FLINK-4565. We need to add unit tests, an IT case, 
> and update the docs about that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9422) Dedicated operator for UNION on streaming tables with time attributes

2018-05-23 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-9422:
-

Assignee: Ruidong Li

> Dedicated operator for UNION on streaming tables with time attributes
> -
>
> Key: FLINK-9422
> URL: https://issues.apache.org/jira/browse/FLINK-9422
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Ruidong Li
>Priority: Minor
>
> We can implement a dedicated operator for a {{UNION}} operator on tables with 
> time attributes. Currently, {{UNION}} is translated into a {{UNION ALL}} and 
> a subsequent {{GROUP BY}} on all attributes without aggregation functions. 
> The state of the grouping operator is only clean up using state retention 
> timers. 
> The dedicated operator would leverage the monotonicity property of the time 
> attribute and watermarks to automatically clean up its state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7814) Add BETWEEN and NOT BETWEEN expression to Table API

2018-05-16 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7814:
--
Summary: Add BETWEEN and NOT BETWEEN expression to Table API  (was: Add 
BETWEEN expression to Table API )

> Add BETWEEN and NOT BETWEEN expression to Table API
> ---
>
> Key: FLINK-7814
> URL: https://issues.apache.org/jira/browse/FLINK-7814
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Ruidong Li
>Priority: Minor
>
> * The Table API does not have a BETWEEN expression. BETWEEN is quite handy 
> when defining join predicates for window joins.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7814) Add BETWEEN expression to Table API

2018-05-15 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-7814:
-

Assignee: Ruidong Li

> Add BETWEEN expression to Table API 
> 
>
> Key: FLINK-7814
> URL: https://issues.apache.org/jira/browse/FLINK-7814
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Ruidong Li
>Priority: Minor
>
> * The Table API does not have a BETWEEN expression. BETWEEN is quite handy 
> when defining join predicates for window joins.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9346) Update documents for supporting intersect/except in streaming

2018-05-12 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-9346:
-

 Summary: Update documents for supporting intersect/except in 
streaming
 Key: FLINK-9346
 URL: https://issues.apache.org/jira/browse/FLINK-9346
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


Update documents for supporting intersect/except in streaming



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9345) Support EXCEPT and EXCEPT ALL for streaming

2018-05-12 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-9345:
-

 Summary: Support EXCEPT and EXCEPT ALL for streaming
 Key: FLINK-9345
 URL: https://issues.apache.org/jira/browse/FLINK-9345
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


support except, except all for SQL and minus, minus all for TableAPI



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9344) Support INTERSECT and INTERSECT ALL for streaming

2018-05-12 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-9344:
-

 Summary: Support INTERSECT and INTERSECT ALL for streaming
 Key: FLINK-9344
 URL: https://issues.apache.org/jira/browse/FLINK-9344
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


support intersect and intersect all for both SQL and TableAPI



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9332) Fix Codegen error of CallGenerator

2018-05-10 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-9332:
-

 Summary: Fix Codegen error of CallGenerator  
 Key: FLINK-9332
 URL: https://issues.apache.org/jira/browse/FLINK-9332
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


function call may return null, but nullTerm did not handle it correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9216) Fix comparator violation

2018-04-19 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-9216:
-

 Summary: Fix comparator violation
 Key: FLINK-9216
 URL: https://issues.apache.org/jira/browse/FLINK-9216
 Project: Flink
  Issue Type: Bug
  Components: Streaming
 Environment: {{JSONGenerator}} uses an improper {{Comparator}} for 
sorting Operator ID, which might cause 
{{java.lang.IllegalArgumentException: Comparison method violates its general 
contract!}}
Reporter: Ruidong Li
Assignee: Ruidong Li






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8921) Split code generated call expression

2018-03-12 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-8921:
-

Assignee: Ruidong Li

> Split code generated call expression 
> -
>
> Key: FLINK-8921
> URL: https://issues.apache.org/jira/browse/FLINK-8921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>
> In FLINK-8274 we introduced the possibility of splitting the generated code 
> into multiple methods in order to exceed the JVMs maximum method size (see 
> also https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.9).
> At the moment we only split methods by fields, however, this is not enough in 
> all case. We should also split expressions. I suggest to split the operands 
> of a {{RexCall}} in 
> {{org.apache.flink.table.codegen.CodeGenerator#visitCall}} if we reach a 
> certain threshold. However, this should happen as lazily as possible to keep 
> the runtime overhead low.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8868) Support Table Function as Table

2018-03-05 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8868:
-

 Summary: Support Table Function as Table
 Key: FLINK-8868
 URL: https://issues.apache.org/jira/browse/FLINK-8868
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


support SQL like:  SELECT * FROM TABLE(tf("a"))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8821) Fix non-terminating decimal error

2018-03-01 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-8821:
--
Summary: Fix non-terminating decimal error  (was: Fix BigDecimal divide in 
AvgAggFunction)

> Fix non-terminating decimal error
> -
>
> Key: FLINK-8821
> URL: https://issues.apache.org/jira/browse/FLINK-8821
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> The DecimalAvgAggFunction lacks precision protection



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8821) Fix BigDecimal divide in AvgAggFunction

2018-03-01 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8821:
-

 Summary: Fix BigDecimal divide in AvgAggFunction
 Key: FLINK-8821
 URL: https://issues.apache.org/jira/browse/FLINK-8821
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


The DecimalAvgAggFunction lacks precision protection



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8617) Fix code generation bug while accessing Map type

2018-02-08 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8617:
-

 Summary: Fix code generation bug while accessing Map type
 Key: FLINK-8617
 URL: https://issues.apache.org/jira/browse/FLINK-8617
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}.
And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} 
and {code}ScalarOperators.generateIsNotNull{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8555) Fix TableFunction varargs length exceeds 254 for SQL

2018-02-03 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8555:
-

 Summary: Fix TableFunction varargs length exceeds 254 for SQL
 Key: FLINK-8555
 URL: https://issues.apache.org/jira/browse/FLINK-8555
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


With Varargs, TableAPI can handle table function call with parameters exceeds 
254 correctly.
This issue is intend to support long parameters for SQL



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8312) Fix ScalarFunction varargs length exceeds 254 for SQL

2017-12-23 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8312:
-

 Summary: Fix ScalarFunction varargs length exceeds 254 for SQL
 Key: FLINK-8312
 URL: https://issues.apache.org/jira/browse/FLINK-8312
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


With Varargs, TableAPI can handle scalar function call with parameters exceeds 
254 correctly.
This issue is intend to support long parameters for SQL



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8277) Optimize code generation by using local references

2017-12-22 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-8277:
-

Assignee: Ruidong Li

> Optimize code generation by using local references
> --
>
> Key: FLINK-8277
> URL: https://issues.apache.org/jira/browse/FLINK-8277
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>
> By default, Flink uses {{org.apache.calcite.rex.RexProgram#expandLocalRef}} 
> to remove local references which reverses the effect of common subexpression 
> elimination. For a more performant execution and smaller generated code we 
> should leverage common subexpressions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI

2017-12-21 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-8301:
--
Summary: Support Unicode in codegen for SQL && TableAPI  (was: Support 
Unicode in codegen for SQL)

> Support Unicode in codegen for SQL && TableAPI
> --
>
> Key: FLINK-8301
> URL: https://issues.apache.org/jira/browse/FLINK-8301
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The current code generation do not support Unicode, "\u0001" will be 
> generated to "\\u0001", function call like concat(str, "\u0001") will lead to 
> wrong result.
> This issue intend to handle char/varchar literal correctly, some examples 
> followed as below.
> literal: '\u0001abc'->   codegen:  "\u0001abc"
> literal: '\u0022\' ->   codegen:  "\"\\"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8301) Support Unicode in codegen for SQL

2017-12-21 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-8301:
--
Summary: Support Unicode in codegen for SQL  (was: Support Unicode in 
codegen for TableAPI && SQL)

> Support Unicode in codegen for SQL
> --
>
> Key: FLINK-8301
> URL: https://issues.apache.org/jira/browse/FLINK-8301
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The current code generation do not support Unicode, "\u0001" will be 
> generated to "\\u0001", function call like concat(str, "\u0001") will lead to 
> wrong result.
> This issue intend to handle char/varchar literal correctly, some examples 
> followed as below.
> literal: '\u0001abc'->   codegen:  "\u0001abc"
> literal: '\u0022\' ->   codegen:  "\"\\"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8301) Support Unicode in codegen for TableAPI && SQL

2017-12-21 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-8301:
--
Description: 
The current code generation do not support Unicode, "\u0001" will be generated 
to "\\u0001", function call like concat(str, "\u0001") will lead to wrong 
result.

This issue intend to handle char/varchar literal correctly, some examples 
followed as below.
literal: '\u0001abc'->   codegen:  "\u0001abc"
literal: '\u0022\' ->   codegen:  "\"\\"


  was:
The current code generation do not support Unicode, "\u0001" will be generated 
to "\\u0001", function call like concat(str, "\u0001") will lead to wrong 
result.

This issue intend to handle char/varchar literal correctly, some examples 
followed as below.
literal  codegen
'\u0001abc' "\u0001abc"
'\u0022\'  "\"\\"



> Support Unicode in codegen for TableAPI && SQL
> --
>
> Key: FLINK-8301
> URL: https://issues.apache.org/jira/browse/FLINK-8301
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The current code generation do not support Unicode, "\u0001" will be 
> generated to "\\u0001", function call like concat(str, "\u0001") will lead to 
> wrong result.
> This issue intend to handle char/varchar literal correctly, some examples 
> followed as below.
> literal: '\u0001abc'->   codegen:  "\u0001abc"
> literal: '\u0022\' ->   codegen:  "\"\\"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8301) Support Unicode in codegen for TableAPI && SQL

2017-12-21 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8301:
-

 Summary: Support Unicode in codegen for TableAPI && SQL
 Key: FLINK-8301
 URL: https://issues.apache.org/jira/browse/FLINK-8301
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


The current code generation do not support Unicode, "\u0001" will be generated 
to "\\u0001", function call like concat(str, "\u0001") will lead to wrong 
result.

This issue intend to handle char/varchar literal correctly, some examples 
followed as below.
literal  codegen
'\u0001abc' "\u0001abc"
'\u0022\'  "\"\\"




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8274) Fix Java 64K method compiling limitation for CommonCalc

2017-12-17 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8274:
-

 Summary: Fix Java 64K method compiling limitation for CommonCalc
 Key: FLINK-8274
 URL: https://issues.apache.org/jira/browse/FLINK-8274
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


For complex SQL Queries, the generated code for {code}DataStreamCalc{code}, 
{code}DataSetCalc{code} may exceed Java's method length limitation 64kb.
 
This issue will split long method to several sub method calls.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8095) Introduce ProjectSetOpTransposeRule to Flink

2017-11-16 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-8095:
-

 Summary: Introduce ProjectSetOpTransposeRule to Flink
 Key: FLINK-8095
 URL: https://issues.apache.org/jira/browse/FLINK-8095
 Project: Flink
  Issue Type: Improvement
Reporter: Ruidong Li
Assignee: Ruidong Li


ProjectSetOpTransposeRule is similar to FilterSetOpTransposeRule, adding 
ProjectSetOpTransposeRule is necessary.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7986) Introduce FilterSetOpTransposeRule to Flink

2017-11-04 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7986:
-

 Summary: Introduce FilterSetOpTransposeRule to Flink
 Key: FLINK-7986
 URL: https://issues.apache.org/jira/browse/FLINK-7986
 Project: Flink
  Issue Type: Improvement
Reporter: Ruidong Li
Assignee: Ruidong Li
Priority: Trivial


A.unionAll(B).where.groupBy.select  
=>
A.where.unionAll(B.where).groupBy.select

this rule will reduce networkIO



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7971) Fix potential NPE with inconsistent state

2017-11-03 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7971:
--
Description: 
In {{GroupAggProcessFunction}}, the status of  {{state}} and {{cntState}} are 
not consistent, which may cause NPE when {{state}} is not null but {{cntState}} 
is null.



  was:
In {{GroupAggProcessFunction}}, the status of  {{state}} and {{cntState}} is 
not consistent, which may cause NPE when {{state}} is not null but {{cntState}} 
is null.




> Fix potential NPE with inconsistent state
> -
>
> Key: FLINK-7971
> URL: https://issues.apache.org/jira/browse/FLINK-7971
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> In {{GroupAggProcessFunction}}, the status of  {{state}} and {{cntState}} are 
> not consistent, which may cause NPE when {{state}} is not null but 
> {{cntState}} is null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7971) Fix potential NPE with inconsistent state

2017-11-03 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7971:
-

 Summary: Fix potential NPE with inconsistent state
 Key: FLINK-7971
 URL: https://issues.apache.org/jira/browse/FLINK-7971
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li
Priority: Major


In {{GroupAggProcessFunction}}, the status of  {{state}} and {{cntState}} is 
not consistent, which may cause NPE when {{state}} is not null but {{cntState}} 
is null.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7759) Fix Bug that fieldName with Boolean prefix can't be parsed by ExpressionParser.

2017-10-16 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7759:
--
Summary: Fix Bug that fieldName with Boolean prefix can't be parsed by 
ExpressionParser.  (was: Fix Bug that fieldName with prefix equals "true" or 
"false" can't be parsed by ExpressionParser.)

> Fix Bug that fieldName with Boolean prefix can't be parsed by 
> ExpressionParser.
> ---
>
> Key: FLINK-7759
> URL: https://issues.apache.org/jira/browse/FLINK-7759
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> just call {{ExpressionParser.parseExpression}} with a prefix equals "true" or 
> "false"
> {{ExpressionParser.parseExpression("true_target")}} or 
> {{ExpressionParser.parseExpression("falsex")}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7759) Fix Bug that fieldName with prefix equals "true" or "false" can't be parsed by ExpressionParser.

2017-10-16 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7759:
--
Summary: Fix Bug that fieldName with prefix equals "true" or "false" can't 
be parsed by ExpressionParser.  (was: Fix Bug that fieldName/functionName with 
prefix equals "true" or "false" can't be parsed by ExpressionParser.)

> Fix Bug that fieldName with prefix equals "true" or "false" can't be parsed 
> by ExpressionParser.
> 
>
> Key: FLINK-7759
> URL: https://issues.apache.org/jira/browse/FLINK-7759
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> just call {{ExpressionParser.parseExpression}} with a prefix equals "true" or 
> "false"
> {{ExpressionParser.parseExpression("true_target")}} or 
> {{ExpressionParser.parseExpression("falsex")}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7776) do not emit duplicated records in group aggregation

2017-10-08 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7776:
--
Description: 
the current group aggregation will compare the last {{Row}} and current {{Row}} 
when {{generateRetraction}} is {{true}} and {{firstRow}} is {{false}} in 
{{GroupAggProcessFunction}},
this logic should be applied to all cases when {{firstRow}} is false, if 
current {{Row}} is same with previous {{Row}}, we do not emit any records.

  was:
the current group aggregation will compare the last {{Row}} and current {{Row}} 
when {{generateRetraction}} is {{true}} and {{firstRow}} is {{false}} in 
{{GroupAggProcessFunction}},
this logic should be applied to all cases when {{firstRow}} is false


> do not emit duplicated records in group aggregation
> ---
>
> Key: FLINK-7776
> URL: https://issues.apache.org/jira/browse/FLINK-7776
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> the current group aggregation will compare the last {{Row}} and current 
> {{Row}} when {{generateRetraction}} is {{true}} and {{firstRow}} is {{false}} 
> in {{GroupAggProcessFunction}},
> this logic should be applied to all cases when {{firstRow}} is false, if 
> current {{Row}} is same with previous {{Row}}, we do not emit any records.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7776) do not emit duplicated records in group aggregation

2017-10-08 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7776:
--
Summary: do not emit duplicated records in group aggregation  (was: remove 
duplicated records collecting in group aggregation)

> do not emit duplicated records in group aggregation
> ---
>
> Key: FLINK-7776
> URL: https://issues.apache.org/jira/browse/FLINK-7776
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> the current group aggregation will compare the last {{Row}} and current 
> {{Row}} when {{generateRetraction}} is {{true}} and {{firstRow}} is {{false}} 
> in {{GroupAggProcessFunction}},
> this logic should be applied to all cases when {{firstRow}} is false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7776) remove duplicated records collecting in group aggregation

2017-10-08 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7776:
--
Summary: remove duplicated records collecting in group aggregation  (was: 
remove duplicate records collecting in group aggregation)

> remove duplicated records collecting in group aggregation
> -
>
> Key: FLINK-7776
> URL: https://issues.apache.org/jira/browse/FLINK-7776
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> the current group aggregation will compare the last {{Row}} and current 
> {{Row}} when {{generateRetraction}} is {{true}} and {{firstRow}} is {{false}} 
> in {{GroupAggProcessFunction}},
> this logic should be applied to all cases when {{firstRow}} is false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7776) remove duplicate records collecting in group aggregation

2017-10-08 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7776:
--
Summary: remove duplicate records collecting in group aggregation  (was: 
remove duplicate records in group aggregation)

> remove duplicate records collecting in group aggregation
> 
>
> Key: FLINK-7776
> URL: https://issues.apache.org/jira/browse/FLINK-7776
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> the current group aggregation will compare the last {{Row}} and current 
> {{Row}} when {{generateRetraction}} is {{true}} and {{firstRow}} is {{false}} 
> in {{GroupAggProcessFunction}},
> this logic should be applied to all cases when {{firstRow}} is false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7776) remove duplicate records in group aggregation

2017-10-08 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7776:
-

 Summary: remove duplicate records in group aggregation
 Key: FLINK-7776
 URL: https://issues.apache.org/jira/browse/FLINK-7776
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


the current group aggregation will compare the last {{Row}} and current {{Row}} 
when {{generateRetraction}} is {{true}} and {{firstRow}} is {{false}} in 
{{GroupAggProcessFunction}},
this logic should be applied to all cases when {{firstRow}} is false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7759) Fix Bug that fieldName/functionName with prefix equals "true" or "false" can't be parsed by ExpressionParser.

2017-10-04 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7759:
-

 Summary: Fix Bug that fieldName/functionName with prefix equals 
"true" or "false" can't be parsed by ExpressionParser.
 Key: FLINK-7759
 URL: https://issues.apache.org/jira/browse/FLINK-7759
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


just call {{ExpressionParser.parseExpression}} with a prefix equals "true" or 
"false"
{{ExpressionParser.parseExpression("true_target")}} or 
{{ExpressionParser.parseExpression("falsex")}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7621) Fix Inconsistency of CaseSensitive Configuration

2017-09-14 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7621:
--
Summary: Fix Inconsistency of CaseSensitive Configuration  (was: Fix 
inconsistency of CaseSensitive Configuration)

> Fix Inconsistency of CaseSensitive Configuration
> 
>
> Key: FLINK-7621
> URL: https://issues.apache.org/jira/browse/FLINK-7621
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The default case sensitive config of Calcite is  {{LEX.java}} which is 
> different from TableAPI



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7621) Fix inconsistency of CaseSensitive Configuration

2017-09-14 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7621:
-

 Summary: Fix inconsistency of CaseSensitive Configuration
 Key: FLINK-7621
 URL: https://issues.apache.org/jira/browse/FLINK-7621
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


The default case sensitive config of Calcite is  {{LEX.java}} which is 
different from TableAPI



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) fix bug when Set Operation handles ANY type

2017-09-08 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Summary: fix bug when Set Operation handles ANY type  (was: Fix bug during 
Set Operation (Union, Minus ... ) with Any(GenericRelDataType) )

> fix bug when Set Operation handles ANY type
> ---
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation({{UNION}}, {{MINUS}},...), it will cause a {{TableException}} with 
> info is "Type is not supported: ANY"
> Here is the test case:
> {code}
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> {code}
> This bug happens because Flink doesn't handle {{createSqlType(ANY)}} and 
> Calcite doesn't know the differences between {{ANY}} and 
> {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will 
> return a {{BasicSqlType}} instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:

`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
```
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
```
this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> `
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> `
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
```
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
```
this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`
this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> ```
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> ```
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`
this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:

@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> `
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> `
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:

@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
{{
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
}}

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
{{
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
}}

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a `TableException` with info is "Type 
is not supported: ANY"
Here is the test case:
`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between `ANY` and `ANY(GenericRelDataType)`, so the 
`createSqlType(ANY)` of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> {{
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> }}
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7596:
-

 Summary: Fix bug during Set Operation (Union, Minus ... ) with 
Any(GenericRelDataType) 
 Key: FLINK-7596
 URL: https://issues.apache.org/jira/browse/FLINK-7596
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a `TableException` with info is "Type 
is not supported: ANY"
Here is the test case:
`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between `ANY` and `ANY(GenericRelDataType)`, so the 
`createSqlType(ANY)` of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7385) Fix ArrayIndexOutOfBoundsException when object-reuse is enabled

2017-08-07 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7385:
-

 Summary: Fix ArrayIndexOutOfBoundsException when object-reuse is 
enabled  
 Key: FLINK-7385
 URL: https://issues.apache.org/jira/browse/FLINK-7385
 Project: Flink
  Issue Type: Bug
Reporter: Ruidong Li
Assignee: Ruidong Li


In OperatorChain.java, there is a potential ArrayIndexOutOfBoundsException when 
object-reuse is enabled



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7194) Add getResultType and getAccumulatorType to AggregateFunction

2017-07-17 Thread Ruidong Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16091036#comment-16091036
 ] 

Ruidong Li commented on FLINK-7194:
---

{{ScalarFunction.getResultType()}} has parameters while {{TableFunction}} and 
{{AggregateFunction}} does not, users can implement different 
{{ScalarFunction.eval()}} with different signatures, such as {{def eval(x: 
Int): Boolean}} or {{def eval(x: String): String}}, so the 
{{ScalarFunction.getResultType()}}' s return value is determined by parameters.

> Add getResultType and getAccumulatorType to AggregateFunction
> -
>
> Key: FLINK-7194
> URL: https://issues.apache.org/jira/browse/FLINK-7194
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>
> FLINK-6725 and FLINK-6457 proposed to remove methods with default 
> implementations such as {{getResultType()}}, {{toString()}}, or 
> {{requiresOver()}} from the base classes of user-defined methods (UDF, UDTF, 
> UDAGG) and instead offer them as contract methods which are dynamically 
> In PR [#3993|https://github.com/apache/flink/pull/3993] I argued that these 
> methods have a fixed signature (in contrast to the {{eval()}}, 
> {{accumulate()}} and {{retract()}} methods) and should be kept in the 
> classes. For users that don't need these methods, this doesn't make a 
> difference because the methods are not abstract and have a default 
> implementation. For users that need to override the methods it makes a 
> difference, because they get IDE and compiler support when overriding them 
> and the cannot get the signature wrong.
> Consequently, I propose to add {{getResultType()}} and 
> {{getAccumulatorType()}} as methods with default implementation to 
> {{AggregateFunction}}. This will make the interface of {{AggregateFunction}} 
> more consistent with {{ScalarFunction}} and {{TableFunction}}.
> What do you think [~shaoxuan], [~RuidongLi] and [~jark]?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7194) Add getResultType and getAccumulatorType to AggregateFunction

2017-07-17 Thread Ruidong Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089419#comment-16089419
 ] 

Ruidong Li commented on FLINK-7194:
---

[~fhueske]I agree to your suggestion, but the {{getResultType()}} in 
{{ScalarFunction}} is different from  {{TableFunction}} and 
{{AggregateFunction}}, what about if we put a ban on overloading {{eval()}} in 
{{ScalarFunction}}? That will result in better consistence.

> Add getResultType and getAccumulatorType to AggregateFunction
> -
>
> Key: FLINK-7194
> URL: https://issues.apache.org/jira/browse/FLINK-7194
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>
> FLINK-6725 and FLINK-6457 proposed to remove methods with default 
> implementations such as {{getResultType()}}, {{toString()}}, or 
> {{requiresOver()}} from the base classes of user-defined methods (UDF, UDTF, 
> UDAGG) and instead offer them as contract methods which are dynamically 
> In PR [#3993|https://github.com/apache/flink/pull/3993] I argued that these 
> methods have a fixed signature (in contrast to the {{eval()}}, 
> {{accumulate()}} and {{retract()}} methods) and should be kept in the 
> classes. For users that don't need these methods, this doesn't make a 
> difference because the methods are not abstract and have a default 
> implementation. For users that need to override the methods it makes a 
> difference, because they get IDE and compiler support when overriding them 
> and the cannot get the signature wrong.
> Consequently, I propose to add {{getResultType()}} and 
> {{getAccumulatorType()}} as methods with default implementation to 
> {{AggregateFunction}}. This will make the interface of {{AggregateFunction}} 
> more consistent with {{ScalarFunction}} and {{TableFunction}}.
> What do you think [~shaoxuan], [~RuidongLi] and [~jark]?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7168) Add support for Table API & SQL steam-stream inner/left join

2017-07-12 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li closed FLINK-7168.
-
Resolution: Duplicate

> Add support for Table API & SQL steam-stream inner/left join 
> -
>
> Key: FLINK-7168
> URL: https://issues.apache.org/jira/browse/FLINK-7168
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7168) Add support for Table API & SQL steam-stream inner/left join

2017-07-12 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7168:
-

 Summary: Add support for Table API & SQL steam-stream inner/left 
join 
 Key: FLINK-7168
 URL: https://issues.apache.org/jira/browse/FLINK-7168
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7163) Make LogicalTableFunctionCall immutable

2017-07-12 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7163:
-

 Summary: Make LogicalTableFunctionCall immutable
 Key: FLINK-7163
 URL: https://issues.apache.org/jira/browse/FLINK-7163
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


All sub-classes of LogicalNode are immutable except for 
`LogicalTableFunctionCall`, it's better to make LogicalTableFunctionCall 
immutable



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-7014:
-

Assignee: Ruidong Li

> Expose isDeterministic interface to ScalarFunction and TableFunction
> 
>
> Key: FLINK-7014
> URL: https://issues.apache.org/jira/browse/FLINK-7014
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> Currently, the `isDeterministic` method of implementations of `SqlFuntion` 
> are always returning true, which cause inappropriate optimization in Calcite, 
> such as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7014:
-

 Summary: Expose isDeterministic interface to ScalarFunction and 
TableFunction
 Key: FLINK-7014
 URL: https://issues.apache.org/jira/browse/FLINK-7014
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Ruidong Li


Currently, the `isDeterministic` method of implementations of `SqlFuntion` are 
always returning true, which cause inappropriate optimization in Calcite, such 
as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >