[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

2017-03-08 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3314
  
Yes totally agree. Thanks very much for taking the time to review the PRs. 
Will do it next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3679) Allow Kafka consumer to skip corrupted messages

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902597#comment-15902597
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3314
  
Yes totally agree. Thanks very much for taking the time to review the PRs. 
Will do it next time.


> Allow Kafka consumer to skip corrupted messages
> ---
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-03-08 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902577#comment-15902577
 ] 

Aljoscha Krettek commented on FLINK-5991:
-

I prefer to only expose it on the operator state store and leave the 
{{ListCheckpointed}} interface simple as it is now.

> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Streaming
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3487: [FLINK-5980] Expose max-parallelism value in RuntimeConte...

2017-03-08 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3487
  
Thank you, Stephan and zentol.
Good question and suggestion. I didn't consider it too much for batch jobs.
Also I think it's a bad idea that naming the variable "numberOfKeyGroups" 
in TaskInfo. Keeping max-parallelism is better. It's more common and makes much 
more sense for other scenarios.
I will make sure it can work with batch jobs, and update this PR later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5832) Support for simple hive UDF

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902572#comment-15902572
 ] 

ASF GitHub Bot commented on FLINK-5832:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3456#discussion_r105095839
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -92,6 +92,11 @@ under the License.


 
+   
+   org.apache.hive
--- End diff --

When I rebased the patch onto #3389 , the issue disappears.


> Support for simple hive UDF
> ---
>
> Key: FLINK-5832
> URL: https://issues.apache.org/jira/browse/FLINK-5832
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> The first step of FLINK-5802 is to support simple Hive UDF.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3456: [FLINK-5832] [table] Support for simple hive UDF

2017-03-08 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3456#discussion_r105095839
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -92,6 +92,11 @@ under the License.


 
+   
+   org.apache.hive
--- End diff --

When I rebased the patch onto #3389 , the issue disappears.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation

2017-03-08 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5995:

Component/s: (was: Core)
 State Backends, Checkpointing
 DataStream API

> Get a Exception when creating the ListStateDescriptor with a TypeInformation 
> -
>
> Key: FLINK-5995
> URL: https://issues.apache.org/jira/browse/FLINK-5995
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When use OperatorState and creating the ListStateDescriptor with a 
> TypeInformation,I got a exception. The Exception info is:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
>   at 
> org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
>   at 
> org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
>   at 
> org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the 
> `getOperatorState` method. I appreciate If anyone can give me some advice?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages

2017-03-08 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-6004:

Component/s: Streaming Connectors

> Allow FlinkKinesisConsumer to skip corrupted messages
> -
>
> Key: FLINK-6004
> URL: https://issues.apache.org/jira/browse/FLINK-6004
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> It is quite clear from the fix of FLINK-3679 that in reality, users might 
> encounter corrupted messages from Kafka / Kinesis / generally external 
> sources when deserializing them.
> The consumers should support simply skipping those messages, by letting the 
> deserialization schema return {{null}}, and checking {{null}} values within 
> the consumer.
> This has been done for the Kafka consumer already. This ticket tracks the 
> improvement for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

2017-03-08 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3314
  
@haohui - one suggestion for future contributions for easier reviews:
We usually use follow-up commits that addresses review comments, instead of 
force pushing the whole branch. For reviewers, this allows easier tracking of 
history of what has been addressed and fixed from the original PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3679) Allow Kafka consumer to skip corrupted messages

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902563#comment-15902563
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3314
  
@haohui - one suggestion for future contributions for easier reviews:
We usually use follow-up commits that addresses review comments, instead of 
force pushing the whole branch. For reviewers, this allows easier tracking of 
history of what has been addressed and fixed from the original PR.


> Allow Kafka consumer to skip corrupted messages
> ---
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages

2017-03-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6004:
---
Description: 
It is quite clear from the fix of FLINK-3679 that in reality, users might 
encounter corrupted messages from Kafka / Kinesis / generally external sources 
when deserializing them.

The consumers should support simply skipping those messages, by letting the 
deserialization schema return {{null}}, and checking {{null}} values within the 
consumer.

This has been done for the Kafka consumer already. This ticket tracks the 
improvement for the Kinesis consumer.

  was:
It is quite clear from the fix of FLINK-3679 that in reality, users might 
encounter corrupted messages from Kafka / Kinesis / generally external sources 
when deserializing them.

The consumers should support simply skipping those messages, by letting the 
deserialization schema return `null`, and checking `null` values within the 
consumer.

This has been done for the Kafka consumer already. This ticket tracks the 
improvement for the Kinesis consumer.


> Allow FlinkKinesisConsumer to skip corrupted messages
> -
>
> Key: FLINK-6004
> URL: https://issues.apache.org/jira/browse/FLINK-6004
> Project: Flink
>  Issue Type: Improvement
>Reporter: Tzu-Li (Gordon) Tai
>
> It is quite clear from the fix of FLINK-3679 that in reality, users might 
> encounter corrupted messages from Kafka / Kinesis / generally external 
> sources when deserializing them.
> The consumers should support simply skipping those messages, by letting the 
> deserialization schema return {{null}}, and checking {{null}} values within 
> the consumer.
> This has been done for the Kafka consumer already. This ticket tracks the 
> improvement for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages

2017-03-08 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6004:
--

 Summary: Allow FlinkKinesisConsumer to skip corrupted messages
 Key: FLINK-6004
 URL: https://issues.apache.org/jira/browse/FLINK-6004
 Project: Flink
  Issue Type: Improvement
Reporter: Tzu-Li (Gordon) Tai


It is quite clear from the fix of FLINK-3679 that in reality, users might 
encounter corrupted messages from Kafka / Kinesis / generally external sources 
when deserializing them.

The consumers should support simply skipping those messages, by letting the 
deserialization schema return `null`, and checking `null` values within the 
consumer.

This has been done for the Kafka consumer already. This ticket tracks the 
improvement for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5980) Expose max-parallelism value in RuntimeContext

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902558#comment-15902558
 ] 

ASF GitHub Bot commented on FLINK-5980:
---

Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3487
  
Thank you, Stephan and zentol.
Good question and suggestion. I didn't consider it too much for batch jobs.
Also I think it's a bad idea that naming the variable "numberOfKeyGroups" 
in TaskInfo. Keeping max-parallelism is better. It's more common and makes much 
more sense for other scenarios.
I will make sure it can work with batch jobs, and update this PR later.


> Expose max-parallelism value in RuntimeContext
> --
>
> Key: FLINK-5980
> URL: https://issues.apache.org/jira/browse/FLINK-5980
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> I am implementing a custom source function. I want to keep all progresses in 
> a ListState to support dynamically scaling just like what FlinkKafkaConsumer 
> did. And I think the max-parallelism value is ideal length for the ListState 
> in my scenario. But I realize that currently the max-parallelism value is not 
> visible to UDF. 
> I propose exposing the max-parallelism value in RuntimeContext. It will be 
> useful in dynamically scaling scenario.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) Allow Kafka consumer to skip corrupted messages

2017-03-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902554#comment-15902554
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3679:


I'm a bit hesitant whether or not we want to backport this fix to 
{{release-1.1}} and {{release-1.2}}.

I think it's ok, since it doesn't break any user code.

[~rmetzger] [~jgrier] what do you think?

> Allow Kafka consumer to skip corrupted messages
> ---
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-03-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3314


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-03-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-5583.

   Resolution: Fixed
Fix Version/s: 1.3.0

Resolved for {{master}} with 
http://git-wip-us.apache.org/repos/asf/flink/commit/c39ad31.

Thanks a lot for your contribution [~wheat9]!

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-3679) Allow Kafka consumer to skip corrupted messages

2017-03-08 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-3679.

   Resolution: Fixed
Fix Version/s: 1.3.0

Resolved for {{master}} with 
http://git-wip-us.apache.org/repos/asf/flink/commit/c39ad31.

Thanks a lot for your contribution [~wheat9]!

> Allow Kafka consumer to skip corrupted messages
> ---
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) Allow Kafka consumer to skip corrupted messages

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902546#comment-15902546
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3314


> Allow Kafka consumer to skip corrupted messages
> ---
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902515#comment-15902515
 ] 

ASF GitHub Bot commented on FLINK-4810:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
@StephanEwen 
I saw in another JIRA one of your comment where you talked about 
refactoring CheckPointcoordinator and Pendingcheckpoint. So you woud this PR to 
wait till then?


> Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful 
> checkpoints
> 
>
> Key: FLINK-4810
> URL: https://issues.apache.org/jira/browse/FLINK-4810
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>
> The Checkpoint coordinator should track the number of consecutive 
> unsuccessful checkpoints.
> If more than {{n}} (configured value) checkpoints fail in a row, it should 
> call {{fail()}} on the execution graph to trigger a recovery.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGr...

2017-03-08 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3334
  
@StephanEwen 
I saw in another JIRA one of your comment where you talked about 
refactoring CheckPointcoordinator and Pendingcheckpoint. So you woud this PR to 
wait till then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902429#comment-15902429
 ] 

ASF GitHub Bot commented on FLINK-4545:
---

Github user wenlong88 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3467#discussion_r105081191
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -265,11 +281,15 @@ public String toString() {
// 

 
private void returnMemorySegment(MemorySegment segment) {
+   assert Thread.holdsLock(availableMemorySegments);
--- End diff --

Hi, I have a question about assert, because I found that assertion is 
disabled in java by default. why not use explicit 
`synchronized(availableMemorySegments)` which may be more common usage.


> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3467: [FLINK-4545] preparations for removing the network...

2017-03-08 Thread wenlong88
Github user wenlong88 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3467#discussion_r105081191
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -265,11 +281,15 @@ public String toString() {
// 

 
private void returnMemorySegment(MemorySegment segment) {
+   assert Thread.holdsLock(availableMemorySegments);
--- End diff --

Hi, I have a question about assert, because I found that assertion is 
disabled in java by default. why not use explicit 
`synchronized(availableMemorySegments)` which may be more common usage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5781) Generation HTML from ConfigOption

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902401#comment-15902401
 ] 

ASF GitHub Bot commented on FLINK-5781:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3495#discussion_r105077753
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java ---
@@ -137,6 +153,26 @@ public boolean hasDeprecatedKeys() {
return deprecatedKeys == EMPTY ? 
Collections.emptyList() : Arrays.asList(deprecatedKeys);
}
 
+   public String shortDescription() {
+   return shortDescription;
+   }
+
+   public String description() {
+   return description;
+   }
+
+   String toHTMLString(boolean includeShort) {
+   String stringBuilder = "" +
--- End diff --

I think we should use `java.lang.StringBuilder`.

```java
StringBuilder sb = new StringBuilder();
sb.append("")
.append("").append(key).append("");
```


> Generation HTML from ConfigOption
> -
>
> Key: FLINK-5781
> URL: https://issues.apache.org/jira/browse/FLINK-5781
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Ufuk Celebi
>Assignee: Dawid Wysakowicz
>
> Use the ConfigOption instances to generate a HTML page that we can use to 
> include in the docs configuration page.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3495: [FLINK-5781] Generation HTML from ConfigOption

2017-03-08 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3495#discussion_r105077753
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java ---
@@ -137,6 +153,26 @@ public boolean hasDeprecatedKeys() {
return deprecatedKeys == EMPTY ? 
Collections.emptyList() : Arrays.asList(deprecatedKeys);
}
 
+   public String shortDescription() {
+   return shortDescription;
+   }
+
+   public String description() {
+   return description;
+   }
+
+   String toHTMLString(boolean includeShort) {
+   String stringBuilder = "" +
--- End diff --

I think we should use `java.lang.StringBuilder`.

```java
StringBuilder sb = new StringBuilder();
sb.append("")
.append("").append(key).append("");
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-08 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902390#comment-15902390
 ] 

sunjincheng commented on FLINK-5658:


Hi, [~Yuhong_kyo], sounds good to me. The [partitioned] PR[#3397 | 
https://github.com/apache/flink/pull/3397] already merged. [non-partitioned] 
PR[#3491 | https://github.com/apache/flink/pull/3491] today should be able to 
merge. Then you can rebase your code on master. 

Thanks,
SunJincheng

> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5998) Un-fat Hadoop from Flink fat jar

2017-03-08 Thread Haohui Mai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haohui Mai reassigned FLINK-5998:
-

Assignee: Haohui Mai

> Un-fat Hadoop from Flink fat jar
> 
>
> Key: FLINK-5998
> URL: https://issues.apache.org/jira/browse/FLINK-5998
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Haohui Mai
>
> As a first step towards FLINK-2268, I would suggest to put all hadoop 
> dependencies into a jar separate from Flink's fat jar.
> This would allow users to put a custom Hadoop jar in there, or even deploy 
> Flink without a Hadoop fat jar at all in environments where Hadoop is 
> provided (EMR).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation

2017-03-08 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902363#comment-15902363
 ] 

sunjincheng commented on FLINK-5995:


Hi, [~srichter] I would be very grateful If you can give me some suggestions ?

> Get a Exception when creating the ListStateDescriptor with a TypeInformation 
> -
>
> Key: FLINK-5995
> URL: https://issues.apache.org/jira/browse/FLINK-5995
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When use OperatorState and creating the ListStateDescriptor with a 
> TypeInformation,I got a exception. The Exception info is:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
>   at 
> org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
>   at 
> org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
>   at 
> org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the 
> `getOperatorState` method. I appreciate If anyone can give me some advice?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2017-03-08 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi reassigned FLINK-5053:
---

Assignee: Xiaogang Shi

> Incremental / lightweight snapshots for checkpoints
> ---
>
> Key: FLINK-5053
> URL: https://issues.apache.org/jira/browse/FLINK-5053
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Xiaogang Shi
>
> There is currently basically no difference between savepoints and checkpoints 
> in Flink and both are created through exactly the same process.
> However, savepoints and checkpoints have a slightly different meaning which 
> we should take into account to keep Flink efficient:
> - Savepoints are (typically infrequently) triggered by the user to create a 
> state from which the application can be restarted, e.g. because Flink, some 
> code, or the parallelism needs to be changed.
> - Checkpoints are (typically frequently) triggered by the System to allow for 
> fast recovery in case of failure, but keeping the job/system unchanged.
> This means that savepoints and checkpoints can have different properties in 
> that:
> - Savepoint should represent a state of the application, where 
> characteristics of the job (e.g. parallelism) can be adjusted for the next 
> restart. One example for things that savepoints need to be aware of are 
> key-groups. Savepoints can potentially be a little more expensive than 
> checkpoints, because they are usually created a lot less frequently through 
> the user.
> - Checkpoints are frequently triggered by the system to allow for fast 
> failure recovery. However, failure recovery leaves all characteristics of the 
> job unchanged. This checkpoints do not have to be aware of those, e.g. think 
> again of key groups. Checkpoints should run faster than creating savepoints, 
> in particular it would be nice to have incremental checkpoints.
> For a first approach, I would suggest the following steps/changes:
> - In checkpoint coordination: differentiate between triggering checkpoints 
> and savepoints. Introduce properties for checkpoints that describe their set 
> of abilities, e.g. "is-key-group-aware", "is-incremental".
> - In state handle infrastructure: introduce state handles that reflect 
> incremental checkpoints and drop full key-group awareness, i.e. covering 
> folders instead of files and not having keygroup_id -> file/offset mapping, 
> but keygroup_range -> folder?
> - Backend side: We should start with RocksDB by reintroducing something 
> similar to semi-async snapshots, but using 
> BackupableDBOptions::setShareTableFiles(true) and transferring only new 
> incremental outputs to HDFS. Notice that using RocksDB's internal backup 
> mechanism is giving up on the information about individual key-groups. But as 
> explained above, this should be totally acceptable for checkpoints, while 
> savepoints should use the key-group-aware fully async mode. Of course we also 
> need to implement the ability to restore from both types of snapshots.
> One problem in the suggested approach is still that even checkpoints should 
> support scale-down, in case that only a smaller number of instances is left 
> available in a recovery case.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3497: [FLINK-6002] Documentation: 'MacOS X' section in Quicksta...

2017-03-08 Thread clarkyzl
Github user clarkyzl commented on the issue:

https://github.com/apache/flink/pull/3497
  
+1 LGTM. Thanks @phoenixjiangnan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6002) Documentation: 'MacOS X' under 'Download and Start Flink' in Quickstart page is not rendered correctly

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902328#comment-15902328
 ] 

ASF GitHub Bot commented on FLINK-6002:
---

Github user clarkyzl commented on the issue:

https://github.com/apache/flink/pull/3497
  
+1 LGTM. Thanks @phoenixjiangnan 


> Documentation: 'MacOS X' under 'Download and Start Flink' in Quickstart page 
> is not rendered correctly
> --
>
> Key: FLINK-6002
> URL: https://issues.apache.org/jira/browse/FLINK-6002
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Bowen Li
>Priority: Trivial
> Fix For: 1.2.1
>
>
> On 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html#setup-download-and-start-flink
>  , command lines in "MacOS X" part are not rendered correctly.
> This is because the markdown is misformatted - it doesn't leave a blank line 
> between text and code block.
> So the fix is simple - add a blank line between text and code block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5976) Refactoring duplicate Tokenizer in flink-test

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902310#comment-15902310
 ] 

ASF GitHub Bot commented on FLINK-5976:
---

Github user liuyuzhong7 commented on the issue:

https://github.com/apache/flink/pull/3485
  
@StephanEwen It's a good idea. We can remove many duplicate code in 
flink-tests in this way.
But I found it also duplicate in other modules. How to solve it in other 
modules.

![image](https://cloud.githubusercontent.com/assets/24708126/23732447/1f3cdd08-04ae-11e7-866e-a5a0e6d5e242.png)





> Refactoring duplicate Tokenizer in flink-test
> -
>
> Key: FLINK-5976
> URL: https://issues.apache.org/jira/browse/FLINK-5976
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 1.2.0
>Reporter: liuyuzhong7
>Priority: Minor
>  Labels: test
> Fix For: 1.2.0
>
>
> There are some duplicate code like this in flink-test, I think refactor this 
> will be better. 
> ```
> public final class Tokenizer implements FlatMapFunction Tuple2> {
>   @Override
>   public void flatMap(String value, Collector> 
> out) {
>   // normalize and split the line
>   String[] tokens = value.toLowerCase().split("\\W+");
>   // emit the pairs
>   for (String token : tokens) {
>   if (token.length() > 0) {
>   out.collect(new Tuple2(token, 
> 1));
>   }
>   }
>   }
> }
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3485: FLINK-5976 [tests] Deduplicate Tokenizer in tests

2017-03-08 Thread liuyuzhong7
Github user liuyuzhong7 commented on the issue:

https://github.com/apache/flink/pull/3485
  
@StephanEwen It's a good idea. We can remove many duplicate code in 
flink-tests in this way.
But I found it also duplicate in other modules. How to solve it in other 
modules.

![image](https://cloud.githubusercontent.com/assets/24708126/23732447/1f3cdd08-04ae-11e7-866e-a5a0e6d5e242.png)





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3488: [FLINK-5971] [flip-6] Add timeout for registered j...

2017-03-08 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3488#discussion_r105068068
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The set of configuration options relating to the ResourceManager
+ */
+@PublicEvolving
+public class ResourceManagerOptions {
+
+   public static final ConfigOption JOB_TIMEOUT = ConfigOptions
+   .key("resourcemanager.job.timeout")
--- End diff --

can we have a more specific name like "inactive_job.timeout". It looks like 
normal job will also timeout for 5 minutes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-03-08 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822346#comment-15822346
 ] 

Ted Yu edited comment on FLINK-5486 at 3/9/17 1:28 AM:
---

Lock on State.bucketStates should be held in the following method:

{code}
  private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : restoredState.bucketStates.values()) {
{code}


was (Author: yuzhih...@gmail.com):
Lock on State.bucketStates should be held in the following method:
{code}
  private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : restoredState.bucketStates.values()) {
{code}

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3488: [FLINK-5971] [flip-6] Add timeout for registered j...

2017-03-08 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3488#discussion_r105068133
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java ---
@@ -28,6 +28,10 @@
 @PublicEvolving
 public class AkkaOptions {
 
+   public static final ConfigOption AKKA_ASK_TIMEOUT = 
ConfigOptions
--- End diff --

Maybe add a comment to be consistency with other options in this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902280#comment-15902280
 ] 

ASF GitHub Bot commented on FLINK-5971:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3488#discussion_r105068133
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java ---
@@ -28,6 +28,10 @@
 @PublicEvolving
 public class AkkaOptions {
 
+   public static final ConfigOption AKKA_ASK_TIMEOUT = 
ConfigOptions
--- End diff --

Maybe add a comment to be consistency with other options in this class?


> JobLeaderIdService should time out registered jobs
> --
>
> Key: FLINK-5971
> URL: https://issues.apache.org/jira/browse/FLINK-5971
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the 
> moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic 
> answer.
> We should remove the {{RunningJobsRegistry}} and register instead a timeout 
> for each job which does not have a job leader associated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902279#comment-15902279
 ] 

ASF GitHub Bot commented on FLINK-5971:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3488#discussion_r105068068
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The set of configuration options relating to the ResourceManager
+ */
+@PublicEvolving
+public class ResourceManagerOptions {
+
+   public static final ConfigOption JOB_TIMEOUT = ConfigOptions
+   .key("resourcemanager.job.timeout")
--- End diff --

can we have a more specific name like "inactive_job.timeout". It looks like 
normal job will also timeout for 5 minutes.


> JobLeaderIdService should time out registered jobs
> --
>
> Key: FLINK-5971
> URL: https://issues.apache.org/jira/browse/FLINK-5971
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the 
> moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic 
> answer.
> We should remove the {{RunningJobsRegistry}} and register instead a timeout 
> for each job which does not have a job leader associated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902273#comment-15902273
 ] 

ASF GitHub Bot commented on FLINK-5971:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3488#discussion_r105067829
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 ---
@@ -283,5 +302,31 @@ public void handleError(Exception exception) {

JobLeaderIdListener.class.getSimpleName(), exception);
}
}
+
+   private void activateTimeout() {
+   if (timeoutId != null) {
+   cancelTimeout();
+   }
+
+   final UUID newTimeoutId = UUID.randomUUID();
+
+   timeoutId = newTimeoutId;
+
+   timeoutFuture = scheduledExecutor.schedule(new 
Runnable() {
+   @Override
+   public void run() {
+   
listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId);
--- End diff --

I think this thread is different with the leader retrieve thread which will 
call "notifyLeaderAddress", right?
So is this possible that when a job is close to timeout, and then comes the 
notification of leader address, both threads are trying to `cancelTimeout`, NPE 
will be thrown if one of them trying to call `timeoutFuture.cancel(true)`, 
while the other one has set `timeoutFuture = null`


> JobLeaderIdService should time out registered jobs
> --
>
> Key: FLINK-5971
> URL: https://issues.apache.org/jira/browse/FLINK-5971
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the 
> moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic 
> answer.
> We should remove the {{RunningJobsRegistry}} and register instead a timeout 
> for each job which does not have a job leader associated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3488: [FLINK-5971] [flip-6] Add timeout for registered j...

2017-03-08 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3488#discussion_r105067829
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 ---
@@ -283,5 +302,31 @@ public void handleError(Exception exception) {

JobLeaderIdListener.class.getSimpleName(), exception);
}
}
+
+   private void activateTimeout() {
+   if (timeoutId != null) {
+   cancelTimeout();
+   }
+
+   final UUID newTimeoutId = UUID.randomUUID();
+
+   timeoutId = newTimeoutId;
+
+   timeoutFuture = scheduledExecutor.schedule(new 
Runnable() {
+   @Override
+   public void run() {
+   
listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId);
--- End diff --

I think this thread is different with the leader retrieve thread which will 
call "notifyLeaderAddress", right?
So is this possible that when a job is close to timeout, and then comes the 
notification of leader address, both threads are trying to `cancelTimeout`, NPE 
will be thrown if one of them trying to call `timeoutFuture.cancel(true)`, 
while the other one has set `timeoutFuture = null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-08 Thread Yuhong Hong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902261#comment-15902261
 ] 

Yuhong Hong commented on FLINK-5658:


Hi [~sunjincheng121], i agree with you, i have already rewrite this use 
processFunction, in order to maintain consistency with unbounded proctime, 
maybe i should wait for your merge, or i can push my implemention first for 
review?

> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3494: [FLINK-5635] [docker] Improve Docker tooling

2017-03-08 Thread jgrier
Github user jgrier commented on a diff in the pull request:

https://github.com/apache/flink/pull/3494#discussion_r105065007
  
--- Diff: flink-contrib/docker-flink/Dockerfile ---
@@ -36,22 +31,24 @@ ENV PATH $PATH:$FLINK_HOME/bin
 EXPOSE 8081
 EXPOSE 6123
 
+# flink-dist can point to a directory or a tarball on the local system
+ARG flink_dist=NOT_SET
+
 # Install build dependencies and flink
+ADD $flink_dist $FLINK_INSTALL_PATH
 RUN set -x && \
-  mkdir -p $FLINK_INSTALL_PATH && \
-  apk --update add --virtual build-dependencies curl && \
-  curl -s $(curl -s 
https://www.apache.org/dyn/closer.cgi\?preferred\=true)flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala_${SCALA_VERSION}.tgz
 | \
--- End diff --

Hi guys, thanks for picking up this PR and running with it.  I think we 
should separate the "official" Docker images issue from this PR.  This PR was 
about just simply improving the existing Docker tooling and making it easier 
for Flink developers to build their own images, maybe with their own tweaks, or 
specifying specific Flink releases to build the image from.

On the "official" images topic I'm totally fine with either approach.  dA 
could do this and maintain those official images or we could do it as part of 
Flink and the community can maintain them.  The main thing for me is just 
seeing that it's super easy for people to get Flink running with Docker and 
there is some accepted "official" image for people to grab rather than wading 
through a bunch of Flink images generated by various people.  That's bad for 
the user experience.

Regardless, let's figure out separately how best to get an official image 
on DockerHub and get this change finished up because I think the changes here 
are useful as is.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902248#comment-15902248
 ] 

ASF GitHub Bot commented on FLINK-5635:
---

Github user jgrier commented on a diff in the pull request:

https://github.com/apache/flink/pull/3494#discussion_r105065007
  
--- Diff: flink-contrib/docker-flink/Dockerfile ---
@@ -36,22 +31,24 @@ ENV PATH $PATH:$FLINK_HOME/bin
 EXPOSE 8081
 EXPOSE 6123
 
+# flink-dist can point to a directory or a tarball on the local system
+ARG flink_dist=NOT_SET
+
 # Install build dependencies and flink
+ADD $flink_dist $FLINK_INSTALL_PATH
 RUN set -x && \
-  mkdir -p $FLINK_INSTALL_PATH && \
-  apk --update add --virtual build-dependencies curl && \
-  curl -s $(curl -s 
https://www.apache.org/dyn/closer.cgi\?preferred\=true)flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala_${SCALA_VERSION}.tgz
 | \
--- End diff --

Hi guys, thanks for picking up this PR and running with it.  I think we 
should separate the "official" Docker images issue from this PR.  This PR was 
about just simply improving the existing Docker tooling and making it easier 
for Flink developers to build their own images, maybe with their own tweaks, or 
specifying specific Flink releases to build the image from.

On the "official" images topic I'm totally fine with either approach.  dA 
could do this and maintain those official images or we could do it as part of 
Flink and the community can maintain them.  The main thing for me is just 
seeing that it's super easy for people to get Flink running with Docker and 
there is some accepted "official" image for people to grab rather than wading 
through a bunch of Flink images generated by various people.  That's bad for 
the user experience.

Regardless, let's figure out separately how best to get an official image 
on DockerHub and get this change finished up because I think the changes here 
are useful as is.



> Improve Docker tooling to make it easier to build images and launch Flink via 
> Docker tools
> --
>
> Key: FLINK-5635
> URL: https://issues.apache.org/jira/browse/FLINK-5635
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
>Assignee: Patrick Lucas
>
> This is a bit of a catch-all ticket for general improvements to the Flink on 
> Docker experience.
> Things to improve:
>   - Make it possible to build a Docker image from your own flink-dist 
> directory as well as official releases.
>   - Make it possible to override the image name so a user can more easily 
> publish these images to their Docker repository
>   - Provide scripts that show how to properly run on Docker Swarm or similar 
> environments with overlay networking (Kubernetes) without using host 
> networking.
>   - Log to stdout rather than to files.
>   - Work properly with docker-compose for local deployment as well as 
> production deployments (Swarm/k8s)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-3026) Publish the flink docker container to the docker registry

2017-03-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902136#comment-15902136
 ] 

Ismaël Mejía edited comment on FLINK-3026 at 3/8/17 10:59 PM:
--

I am reassigning this to you after the discussion we had, however I would be 
glad to follow the evolution of the process, so please ping me once you have 
created the repository for the docker images.



was (Author: iemejia):
I am reassigning this to you after the discussion we have, however I would be 
glad to follow the evolution of the process, so please ping me once you have 
created the repository for the docker images.


> Publish the flink docker container to the docker registry
> -
>
> Key: FLINK-3026
> URL: https://issues.apache.org/jira/browse/FLINK-3026
> Project: Flink
>  Issue Type: Task
>  Components: Build System, Docker
>Reporter: Omer Katz
>Assignee: Patrick Lucas
>  Labels: Deployment, Docker
>
> There's a dockerfile that can be used to build a docker container already in 
> the repository. It'd be awesome to just be able to pull it instead of 
> building it ourselves.
> The dockerfile can be found at 
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
> It also doesn't point to the latest version of Flink which I fixed in 
> https://github.com/apache/flink/pull/1366



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-3026) Publish the flink docker container to the docker registry

2017-03-08 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía reassigned FLINK-3026:
---

Assignee: Patrick Lucas  (was: Ismaël Mejía)

I am reassigning this to you after the discussion we have, however I would be 
glad to follow the evolution of the process, so please ping me once you have 
created the repository for the docker images.


> Publish the flink docker container to the docker registry
> -
>
> Key: FLINK-3026
> URL: https://issues.apache.org/jira/browse/FLINK-3026
> Project: Flink
>  Issue Type: Task
>  Components: Build System, Docker
>Reporter: Omer Katz
>Assignee: Patrick Lucas
>  Labels: Deployment, Docker
>
> There's a dockerfile that can be used to build a docker container already in 
> the repository. It'd be awesome to just be able to pull it instead of 
> building it ourselves.
> The dockerfile can be found at 
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
> It also doesn't point to the latest version of Flink which I fixed in 
> https://github.com/apache/flink/pull/1366



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902132#comment-15902132
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105046076
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
 ---
@@ -46,6 +52,20 @@ class DataStreamCalcRule
   calc.getProgram,
   description)
   }
+
+  override def matches(call: RelOptRuleCall): Boolean = {
--- End diff --

This can be removed because the `RexOver` was removed from `Project` (and 
Calc`) during normalization.


> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902133#comment-15902133
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105045766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -165,6 +165,11 @@ object FlinkRuleSets {
 
   // merge and push unions rules
   UnionEliminatorRule.INSTANCE,
+  
+  // aggregations over intervals should be enabled to be translated 
also in
+  //queries with LogicalWindows, not only queries with LogicalCalc
+  ProjectWindowTransposeRule.INSTANCE,
+  ProjectToWindowRule.INSTANCE,
--- End diff --

This rule has been added to the normalization phase. It can be removed here.


> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105045766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -165,6 +165,11 @@ object FlinkRuleSets {
 
   // merge and push unions rules
   UnionEliminatorRule.INSTANCE,
+  
+  // aggregations over intervals should be enabled to be translated 
also in
+  //queries with LogicalWindows, not only queries with LogicalCalc
+  ProjectWindowTransposeRule.INSTANCE,
+  ProjectToWindowRule.INSTANCE,
--- End diff --

This rule has been added to the normalization phase. It can be removed here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105046076
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
 ---
@@ -46,6 +52,20 @@ class DataStreamCalcRule
   calc.getProgram,
   description)
   }
+
+  override def matches(call: RelOptRuleCall): Boolean = {
--- End diff --

This can be removed because the `RexOver` was removed from `Project` (and 
Calc`) during normalization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6003) Change docker image to be based on the openjdk one (debian+alpine)

2017-03-08 Thread JIRA
Ismaël Mejía created FLINK-6003:
---

 Summary: Change docker image to be based on the openjdk one 
(debian+alpine)
 Key: FLINK-6003
 URL: https://issues.apache.org/jira/browse/FLINK-6003
 Project: Flink
  Issue Type: Improvement
Reporter: Ismaël Mejía
Assignee: Ismaël Mejía
Priority: Minor


The base docker image 'java' is deprecated since the end of the last year.
This issue will also re-introduce a Dockerfile for the debian-based openjdk one 
in addition to the current one based on alpine.
Additional refactorings included:
- Move default version to 1.2.0
- Refactor Dockerfile for consistency reasons (between both versions)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902113#comment-15902113
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3459
  
Hi @huawei-flink, sorry for the long delay. I focused on adding support for 
retraction to the new UDAGG interface (PR #3470), support for time mode 
functions (PR #3370, #3425), and the most simple OVER window case (PR #3397) 
first. These additions should serve as a good basis for the more advanced OVER 
window cases. 

I had a look this PR and noticed a few things:
- The Table API is implemented in Scala. We have a few Java classes (code 
copied & fixed from Calcite, Java examples, and tests) but do not intend to add 
more. The core parts like operators, translation, and runtime code have to be 
implemented in Scala. Mixing languages makes the maintenance more difficult.
- Please squash your commits. Usually, PRs are opened with a single commit 
and new commits are added when feedback is addressed. Before merging a 
committer squashes these commits. However, it is too much effort to squash more 
than 50 commits including merge commits which can cause trouble. If multiple 
contributors worked on a PR, figure out how you can separate the work into one 
commit per author.
- We have recently reworked our aggregation interface which will also serve 
as the interface for user-defined aggregations which should also be usable in 
OVER windows. Please use these aggregation functions. When designing the 
interface we had their use in OVER windows in mind. If you find that the 
interface is lacking a method, please start a discussion. However, we cannot 
have several incompatible aggregation interfaces in the Table API / SQL. Please 
rebase to the current master and use the new aggregation functions.
- A couple of days ago, we added `PROCTIME()` and `ROWTIME()` methods which 
should be used to identify the time mode.
- The first OVER window aggregation should serve as a blueprint for future 
OVER window implementations. We should try to keep the implementations as close 
as possible to share common code and make the overall maintenance of the code 
easier.

Minor comments:
- Why did you check modifications to the `.gitignore` files?
- Please do not remove tests without replacing them with equivalent tests.
- Please do not reformat classes (see FunctionCatalog). These changes cause 
additional effort when reviewing a PR.
- Do not change ScalaDoc comments to JavaDoc comments in Scala code. 
ScalaDoc:
```
/**
  * Comment
  */
```
JavaDoc:
```
/**
 * Comment
 */
```

Thanks, Fabian



> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...

2017-03-08 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3459
  
Hi @huawei-flink, sorry for the long delay. I focused on adding support for 
retraction to the new UDAGG interface (PR #3470), support for time mode 
functions (PR #3370, #3425), and the most simple OVER window case (PR #3397) 
first. These additions should serve as a good basis for the more advanced OVER 
window cases. 

I had a look this PR and noticed a few things:
- The Table API is implemented in Scala. We have a few Java classes (code 
copied & fixed from Calcite, Java examples, and tests) but do not intend to add 
more. The core parts like operators, translation, and runtime code have to be 
implemented in Scala. Mixing languages makes the maintenance more difficult.
- Please squash your commits. Usually, PRs are opened with a single commit 
and new commits are added when feedback is addressed. Before merging a 
committer squashes these commits. However, it is too much effort to squash more 
than 50 commits including merge commits which can cause trouble. If multiple 
contributors worked on a PR, figure out how you can separate the work into one 
commit per author.
- We have recently reworked our aggregation interface which will also serve 
as the interface for user-defined aggregations which should also be usable in 
OVER windows. Please use these aggregation functions. When designing the 
interface we had their use in OVER windows in mind. If you find that the 
interface is lacking a method, please start a discussion. However, we cannot 
have several incompatible aggregation interfaces in the Table API / SQL. Please 
rebase to the current master and use the new aggregation functions.
- A couple of days ago, we added `PROCTIME()` and `ROWTIME()` methods which 
should be used to identify the time mode.
- The first OVER window aggregation should serve as a blueprint for future 
OVER window implementations. We should try to keep the implementations as close 
as possible to share common code and make the overall maintenance of the code 
easier.

Minor comments:
- Why did you check modifications to the `.gitignore` files?
- Please do not remove tests without replacing them with equivalent tests.
- Please do not reformat classes (see FunctionCatalog). These changes cause 
additional effort when reviewing a PR.
- Do not change ScalaDoc comments to JavaDoc comments in Scala code. 
ScalaDoc:
```
/**
  * Comment
  */
```
JavaDoc:
```
/**
 * Comment
 */
```

Thanks, Fabian



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3493: [FLINK-3026] Publish the flink docker container to...

2017-03-08 Thread iemejia
Github user iemejia closed the pull request at:

https://github.com/apache/flink/pull/3493


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3026) Publish the flink docker container to the docker registry

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902108#comment-15902108
 ] 

ASF GitHub Bot commented on FLINK-3026:
---

Github user iemejia closed the pull request at:

https://github.com/apache/flink/pull/3493


> Publish the flink docker container to the docker registry
> -
>
> Key: FLINK-3026
> URL: https://issues.apache.org/jira/browse/FLINK-3026
> Project: Flink
>  Issue Type: Task
>  Components: Build System, Docker
>Reporter: Omer Katz
>Assignee: Ismaël Mejía
>  Labels: Deployment, Docker
>
> There's a dockerfile that can be used to build a docker container already in 
> the repository. It'd be awesome to just be able to pull it instead of 
> building it ourselves.
> The dockerfile can be found at 
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
> It also doesn't point to the latest version of Flink which I fixed in 
> https://github.com/apache/flink/pull/1366



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3026) Publish the flink docker container to the docker registry

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902107#comment-15902107
 ] 

ASF GitHub Bot commented on FLINK-3026:
---

Github user iemejia commented on the issue:

https://github.com/apache/flink/pull/3493
  
I am closing this to create a new one based on the new JIRA


> Publish the flink docker container to the docker registry
> -
>
> Key: FLINK-3026
> URL: https://issues.apache.org/jira/browse/FLINK-3026
> Project: Flink
>  Issue Type: Task
>  Components: Build System, Docker
>Reporter: Omer Katz
>Assignee: Ismaël Mejía
>  Labels: Deployment, Docker
>
> There's a dockerfile that can be used to build a docker container already in 
> the repository. It'd be awesome to just be able to pull it instead of 
> building it ourselves.
> The dockerfile can be found at 
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
> It also doesn't point to the latest version of Flink which I fixed in 
> https://github.com/apache/flink/pull/1366



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3493: [FLINK-3026] Publish the flink docker container to the do...

2017-03-08 Thread iemejia
Github user iemejia commented on the issue:

https://github.com/apache/flink/pull/3493
  
I am closing this to create a new one based on the new JIRA


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730718
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -124,8 +127,26 @@ class DataSetCalc(
   body,
   returnType)
 
+def getForwardIndices = {
--- End diff --

move all forward fields related code into a function `getForwardFields()` 
to keep the main translation function more concise


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902047#comment-15902047
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
  

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105014912
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -125,11 +126,25 @@ class DataSetAggregate(
 .name(aggOpName)
 }
 else {
+  //Forward all fields at conversion
+  val inputInfo = mappedInput.getType
--- End diff --

PR #3472 will remove the preparing mapper but add a `GroupCombineFunction`. 
For both, the combiner and the reducer, we need to forward the fields on which 
is grouped because these are not modified but just forwarded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104708548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -125,11 +126,25 @@ class DataSetAggregate(
 .name(aggOpName)
 }
 else {
+  //Forward all fields at conversion
+  val inputInfo = mappedInput.getType
--- End diff --

An aggregation operator returns the grouping keys and aggregated values. 
Only the grouping keys are not modified and forwarded. Forwarded fields can 
only be assigned for grouped aggregates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902056#comment-15902056
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104708548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -125,11 +126,25 @@ class DataSetAggregate(
 .name(aggOpName)
 }
 else {
+  //Forward all fields at conversion
+  val inputInfo = mappedInput.getType
--- End diff --

An aggregation operator returns the grouping keys and aggregated values. 
Only the grouping keys are not modified and forwarded. Forwarded fields can 
only be assigned for grouped aggregates.


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902054#comment-15902054
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105029771
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
--- End diff --

I didn't notice that the `customWrapper` was used. Can it be removed?


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105029771
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
--- End diff --

I didn't notice that the `customWrapper` was used. Can it be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902058#comment-15902058
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104723484
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
 ---
@@ -97,18 +103,41 @@ class DataSetCorrelate(
 val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
 val pojoFieldMapping = sqlFunction.getPojoFieldMapping
 val udtfTypeInfo = 
sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
+val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
 val mapFunc = correlateMapFunction(
   config,
   inputDS.getType,
   udtfTypeInfo,
+  returnType,
   getRowType,
   joinType,
   rexCall,
   condition,
   Some(pojoFieldMapping),
   ruleDescription)
 
-inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, 
relRowType))
+def getIndices = {
--- End diff --

A correlate forwards all fields from the input and the table function like 
this `[in1, in2, in3, tf1, tf2]` for an input `[in1, in2, in3]` and table 
function `[tf1, tf2]`. So we can do a simple position based mapping of the 
fields of the input type against the output type (field names might change). 
Basically similar to what you are doing with the single row join.

We do not need to look at the table function or the condition.


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902049#comment-15902049
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730718
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -124,8 +127,26 @@ class DataSetCalc(
   body,
   returnType)
 
+def getForwardIndices = {
--- End diff --

move all forward fields related code into a function `getForwardFields()` 
to keep the main translation function more concise


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902055#comment-15902055
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104727744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -199,10 +200,21 @@ class DataSetJoin(
 
 val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
 
+//consider all fields not which are not keys are forwarded
+val leftIndices = (0 until 
left.getRowType.getFieldCount).diff(leftKeys)
+val fieldsLeft = getForwardedInput(leftDataSet.getType, returnType, 
leftIndices)
+
+val rightIndices = (0 until right.getRowType.getFieldCount)
+  .diff(rightKeys)
--- End diff --

Keys can be forwarded


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105030757
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
--- End diff --

`AtomicType` can be `"*"` as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902060#comment-15902060
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105030435
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
--- End diff --

Please document the parameters. 


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902064#comment-15902064
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730153
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
  

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104685724
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 ---
@@ -46,6 +46,7 @@ trait CommonCorrelate {
   config: TableConfig,
   inputTypeInfo: TypeInformation[Row],
   udtfTypeInfo: TypeInformation[Any],
+  returnType: TypeInformation[Row],
--- End diff --

why is this change necessary? `returnType` can be computed from `rowType` 
which is a parameter as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105029829
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
--- End diff --

Can the `customWrapper` be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902057#comment-15902057
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105023568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -124,8 +127,26 @@ class DataSetCalc(
   body,
   returnType)
 
+def getForwardIndices = {
+  // get (input, output) indices of operands,
+  // filter modified operands and specify forwarding
+  val inputFields = extractRefInputFields(calcProgram)
+  calcProgram.getProjectList
+.map(_.getIndex)
+.zipWithIndex
+.filter(tup => inputFields.contains(tup._1))
--- End diff --

This position based mapping is making some assumptions about the internal 
organization of a `RexProgram`, i.e., that the first `n` fields of the 
expressions list are filled in order by the `n` fields of the input.
Can we change this to iterate over the projection list and looking up the 
expression and check whether it is a `RexInputRef`. Basically something like:
```
calcProgram.getProjectList.zipWithIndex.map { case (p, out) =>
  val expr = calcProgram.getExprList.get(p.getIndex)
  expr match {
case i: RexInputRef => Some((i.getIndex, out))
case _ => None
  }
}.flatten
```


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104723484
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
 ---
@@ -97,18 +103,41 @@ class DataSetCorrelate(
 val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
 val pojoFieldMapping = sqlFunction.getPojoFieldMapping
 val udtfTypeInfo = 
sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
+val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
 val mapFunc = correlateMapFunction(
   config,
   inputDS.getType,
   udtfTypeInfo,
+  returnType,
   getRowType,
   joinType,
   rexCall,
   condition,
   Some(pojoFieldMapping),
   ruleDescription)
 
-inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, 
relRowType))
+def getIndices = {
--- End diff --

A correlate forwards all fields from the input and the table function like 
this `[in1, in2, in3, tf1, tf2]` for an input `[in1, in2, in3]` and table 
function `[tf1, tf2]`. So we can do a simple position based mapping of the 
fields of the input type against the output type (field names might change). 
Basically similar to what you are doing with the single row join.

We do not need to look at the table function or the condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104706099
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
--- End diff --

We should throw an exception here. If the types do not match, the logic to 
identify the mapping of forwarded fields is broken. 

[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902062#comment-15902062
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105032956
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
--- End diff --

This function can be removed, right?


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902051#comment-15902051
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105014912
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -125,11 +126,25 @@ class DataSetAggregate(
 .name(aggOpName)
 }
 else {
+  //Forward all fields at conversion
+  val inputInfo = mappedInput.getType
--- End diff --

PR #3472 will remove the preparing mapper but add a `GroupCombineFunction`. 
For both, the combiner and the reducer, we need to forward the fields on which 
is grouped because these are not modified but just forwarded.


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902059#comment-15902059
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104727584
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -199,10 +200,21 @@ class DataSetJoin(
 
 val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
 
+//consider all fields not which are not keys are forwarded
+val leftIndices = (0 until 
left.getRowType.getFieldCount).diff(leftKeys)
--- End diff --

The keys can also be forwarded. 
Actually, these are the most interesting fields because the DataSet will be 
partitioned and maybe also sorted on these keys.


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902053#comment-15902053
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105030757
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
--- End diff --

`AtomicType` can be `"*"` as well.


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105030435
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
--- End diff --

Please document the parameters. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902052#comment-15902052
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730024
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
  

[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902063#comment-15902063
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104685724
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 ---
@@ -46,6 +46,7 @@ trait CommonCorrelate {
   config: TableConfig,
   inputTypeInfo: TypeInformation[Row],
   udtfTypeInfo: TypeInformation[Any],
+  returnType: TypeInformation[Row],
--- End diff --

why is this change necessary? `returnType` can be computed from `rowType` 
which is a parameter as well.


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105023568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -124,8 +127,26 @@ class DataSetCalc(
   body,
   returnType)
 
+def getForwardIndices = {
+  // get (input, output) indices of operands,
+  // filter modified operands and specify forwarding
+  val inputFields = extractRefInputFields(calcProgram)
+  calcProgram.getProjectList
+.map(_.getIndex)
+.zipWithIndex
+.filter(tup => inputFields.contains(tup._1))
--- End diff --

This position based mapping is making some assumptions about the internal 
organization of a `RexProgram`, i.e., that the first `n` fields of the 
expressions list are filled in order by the `n` fields of the input.
Can we change this to iterate over the projection list and looking up the 
expression and check whether it is a `RexInputRef`. Basically something like:
```
calcProgram.getProjectList.zipWithIndex.map { case (p, out) =>
  val expr = calcProgram.getExprList.get(p.getIndex)
  expr match {
case i: RexInputRef => Some((i.getIndex, out))
case _ => None
  }
}.flatten
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104729449
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
+}
+  }
+}
+
+forwardIndices map {
+  case (in, out) =>
+wrappedInput(in) -> wrappedOutput(out)
 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105032956
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
--- End diff --

This function can be removed, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902050#comment-15902050
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104706099
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
  

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
+}
+  }
+}
+
+forwardIndices map {
+  case (in, out) =>
+wrappedInput(in) -> wrappedOutput(out)
 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104727584
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -199,10 +200,21 @@ class DataSetJoin(
 
 val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
 
+//consider all fields not which are not keys are forwarded
+val leftIndices = (0 until 
left.getRowType.getFieldCount).diff(leftKeys)
--- End diff --

The keys can also be forwarded. 
Actually, these are the most interesting fields because the DataSet will be 
partitioned and maybe also sorted on these keys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104727744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -199,10 +200,21 @@ class DataSetJoin(
 
 val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
 
+//consider all fields not which are not keys are forwarded
+val leftIndices = (0 until 
left.getRowType.getFieldCount).diff(leftKeys)
+val fieldsLeft = getForwardedInput(leftDataSet.getType, returnType, 
leftIndices)
+
+val rightIndices = (0 until right.getRowType.getFieldCount)
+  .diff(rightKeys)
--- End diff --

Keys can be forwarded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730153
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
+}
+  }
+}
+
+forwardIndices map {
+  case (in, out) =>
+wrappedInput(in) -> wrappedOutput(out)
 

[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902048#comment-15902048
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105029829
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
--- End diff --

Can the `customWrapper` be removed?


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730024
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
+}
+  }
+}
+
+forwardIndices map {
+  case (in, out) =>
+wrappedInput(in) -> wrappedOutput(out)
 

[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902061#comment-15902061
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104729449
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
  

[jira] [Commented] (FLINK-5942) Harden ZooKeeperStateHandleStore to deal with corrupted data

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901985#comment-15901985
 ] 

ASF GitHub Bot commented on FLINK-5942:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3447#discussion_r105024493
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -240,7 +241,7 @@ public int exists(String pathInZooKeeper) throws 
Exception {
try {
return InstantiationUtil.deserializeObject(data, 
Thread.currentThread().getContextClassLoader());
} catch (IOException | ClassNotFoundException e) {
-   throw new Exception("Failed to deserialize state handle 
from ZooKeeper data from " +
+   throw new FlinkIOException("Failed to deserialize state 
handle from ZooKeeper data from " +
--- End diff --

Why not throw a regular I/O exception here?


> Harden ZooKeeperStateHandleStore to deal with corrupted data
> 
>
> Key: FLINK-5942
> URL: https://issues.apache.org/jira/browse/FLINK-5942
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ZooKeeperStateHandleStore}} cannot handle corrupted Znode data. When 
> calling {{ZooKeeperStateHandleStore.getAll}} or {{getAllSortedByName}} and 
> reading a node with corrupted data, the whole operation will fail. In such a 
> situation, Flink won't be able to recover because it will read over and over 
> again the same corrupted Znodes (in the recovery case). Therefore, I propose 
> to ignore Znodes whose data cannot be read.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5134) Aggregate ResourceSpec for chained operators when generating job graph

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901990#comment-15901990
 ] 

ASF GitHub Bot commented on FLINK-5134:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3455
  
Sounds very good!


> Aggregate ResourceSpec for chained operators when generating job graph
> --
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: zhijiang
>Assignee: zhijiang
>
> This is a part of fine-grained resource configuration in flip-6.
> In *JobGraph* generation, each *JobVertex* may contain a series of chained 
> operators, and the resource of *JobVertex* should be aggregation of 
> individual resource in chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators, and for cpu cores resource, the aggregation is the maximum 
> formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3455: [FLINK-5134] [runtime] [FLIP-6] Aggregate ResourceSpec fo...

2017-03-08 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3455
  
Sounds very good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5940) ZooKeeperCompletedCheckpointStore cannot handle broken state handles

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901987#comment-15901987
 ] 

ASF GitHub Bot commented on FLINK-5940:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3451
  
Fix looks good, but I have a similar comment about the test as for #3446


> ZooKeeperCompletedCheckpointStore cannot handle broken state handles
> 
>
> Key: FLINK-5940
> URL: https://issues.apache.org/jira/browse/FLINK-5940
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ZooKeeperCompletedCheckpointStore}} reads a set of 
> {{RetrievableStateHandles}} from ZooKeeper upon recovery. It then tries to 
> retrieve the {{CompletedCheckpoint}} from the latest state handle. If the 
> retrieve operation fails, then the whole recovery of completed checkpoints 
> fails even though the store might have read older state handles from 
> ZooKeeper. 
> I propose to harden the behaviour by removing broken state handles and 
> returning the first successfully retrieved {{CompletedCheckpoint}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3451: [backport-1.1] [FLINK-5940] [checkpoint] Harden ZooKeeper...

2017-03-08 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3451
  
Fix looks good, but I have a similar comment about the test as for #3446


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3447: [FLINK-5942] [checkpoint] Harden ZooKeeperStateHan...

2017-03-08 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3447#discussion_r105024493
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -240,7 +241,7 @@ public int exists(String pathInZooKeeper) throws 
Exception {
try {
return InstantiationUtil.deserializeObject(data, 
Thread.currentThread().getContextClassLoader());
} catch (IOException | ClassNotFoundException e) {
-   throw new Exception("Failed to deserialize state handle 
from ZooKeeper data from " +
+   throw new FlinkIOException("Failed to deserialize state 
handle from ZooKeeper data from " +
--- End diff --

Why not throw a regular I/O exception here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3446: [FLINK-5940] [checkpoint] Harden ZooKeeperComplete...

2017-03-08 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3446#discussion_r105021165
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() 
throws Exception {
return null;
}
else {
-   return 
checkpointStateHandles.getLast().f0.retrieveState();
+   while(!checkpointStateHandles.isEmpty()) {
+   
Tuple2 
checkpointStateHandle = checkpointStateHandles.peekLast();
+
+   try {
+   return 
retrieveCompletedCheckpoint(checkpointStateHandle);
+   } catch (FlinkException e) {
--- End diff --

I would catch more than `FlinkException` here - after all, we want to fall 
back to earlier checkpoints in any error case, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5940) ZooKeeperCompletedCheckpointStore cannot handle broken state handles

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901975#comment-15901975
 ] 

ASF GitHub Bot commented on FLINK-5940:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3446#discussion_r105021165
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() 
throws Exception {
return null;
}
else {
-   return 
checkpointStateHandles.getLast().f0.retrieveState();
+   while(!checkpointStateHandles.isEmpty()) {
+   
Tuple2 
checkpointStateHandle = checkpointStateHandles.peekLast();
+
+   try {
+   return 
retrieveCompletedCheckpoint(checkpointStateHandle);
+   } catch (FlinkException e) {
--- End diff --

I would catch more than `FlinkException` here - after all, we want to fall 
back to earlier checkpoints in any error case, no?


> ZooKeeperCompletedCheckpointStore cannot handle broken state handles
> 
>
> Key: FLINK-5940
> URL: https://issues.apache.org/jira/browse/FLINK-5940
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ZooKeeperCompletedCheckpointStore}} reads a set of 
> {{RetrievableStateHandles}} from ZooKeeper upon recovery. It then tries to 
> retrieve the {{CompletedCheckpoint}} from the latest state handle. If the 
> retrieve operation fails, then the whole recovery of completed checkpoints 
> fails even though the store might have read older state handles from 
> ZooKeeper. 
> I propose to harden the behaviour by removing broken state handles and 
> returning the first successfully retrieved {{CompletedCheckpoint}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6002) Documentation: 'MacOS X' under 'Download and Start Flink' in Quickstart page is not rendered correctly

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901935#comment-15901935
 ] 

ASF GitHub Bot commented on FLINK-6002:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3497
  
+1 to merge.


> Documentation: 'MacOS X' under 'Download and Start Flink' in Quickstart page 
> is not rendered correctly
> --
>
> Key: FLINK-6002
> URL: https://issues.apache.org/jira/browse/FLINK-6002
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Bowen Li
>Priority: Trivial
> Fix For: 1.2.1
>
>
> On 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html#setup-download-and-start-flink
>  , command lines in "MacOS X" part are not rendered correctly.
> This is because the markdown is misformatted - it doesn't leave a blank line 
> between text and code block.
> So the fix is simple - add a blank line between text and code block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   3   4   >