[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971958#comment-15971958
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user hequn8128 closed the pull request at:

https://github.com/apache/flink/pull/3696


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971577#comment-15971577
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3696
  
Hi @hequn8128 and @shaoxuan-wang, I merged this PR to the 
`table-retraction` branch.
Could you close it? Thanks!


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971543#comment-15971543
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111804596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
 ---
@@ -36,14 +36,14 @@ import org.apache.flink.types.Row
 class DataStreamCorrelate(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
-inputNode: RelNode,
--- End diff --

Ah, OK. Thanks for the explanation!
That makes sense :-)


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967770#comment-15967770
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3696
  
@fhueske, thanks for the review and valuable comments.  
Yes, we'd better add the attributes (which provide information for deriving 
ACCMode) inside the DataStreamRel interface, such that the code will be much 
clean and easy to be understood.

Thanks,
Shaoxuan


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967501#comment-15967501
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/3696
  
hi @fhueske , Thanks for your review and refactorings. I think it's pretty 
good and I have learned a lot form it. I left a few comments in your PR.

As for the hardcode problem, I think you are right, it's better to extend 
the DataStreamRel with a couple of methods. I have added some methods in 
DataStreamRel, so you can use them directly when you squash your PR into my 
commit, thx~




> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967495#comment-15967495
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111372529
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
--- End diff --

hi, you are right. I have extended `DataStreamRel` with three methods:  
`needsUpdatesAsRetraction()`, `producesUpdates()` and `consumesRetractions()` 


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967200#comment-15967200
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111320841
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
 ---
@@ -36,14 +36,14 @@ import org.apache.flink.types.Row
 class DataStreamCorrelate(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
-inputNode: RelNode,
--- End diff --

hi, sorry for missing any commons about this fix. After decoration phase, 
the class type of inputNode is `HepRelVertex` and it will throws 
ClassCastException at `val inputDS = 
inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)`. You can 
reproduce this exception by running tests in 
`DataStreamUserDefinedFunctionITCase`. The reason why there is no problems 
after runVolcanoPlanner is that `DataStreamCorrelateRule` does the 
transformation from `RelSubset` to `DataStreamRel`. I think it's better to 
override the `input`  parameter and use `getInput` when translate to plan. What 
do you think, thx~


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966531#comment-15966531
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110401510
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966533#comment-15966533
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111246693
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966523#comment-15966523
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110398315
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966534#comment-15966534
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111247042
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/RetractionTrait.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
+import org.apache.flink.table.plan.nodes.datastream.AccMode.AccMode
+
+/**
+  * Used to store retraction related properties which is used during rule 
optimization.
+  */
+class RetractionTrait extends RelTrait {
+
+  /**
+* Defines whether downstream operator need retraction. Please note 
that needToRetract is
+* different from needRetraction. NeedToRetract is a property 
particular for each operator,
+* while NeedRetraction is a property for each input. Most of operators 
have only one input,
+* some operators may have more than one inputs (e.g., join, union), 
and the property of the
+* NeedRetraction could be different across different inputs of the 
same operator
+*/
+  private var needToRetract: Boolean = false
+
+  /**
+* Defines the accumulating mode for a operator. Basically there are 
two modes for each
+* operator: Accumulating Mode (Acc) and Accumulating and Retracting 
Mode (AccRetract).
+*/
+  private var accMode = AccMode.Acc
+
+
+  def this(needToRetract: Boolean, accMode: AccMode) {
--- End diff --

I would split this trait into two traits. Unless I'm wrong, we are 
eventually only interested in the `AccMode` and `needToRetract` is just 
temporarily needed to compute the correct `AccMode`. Hence, I would split this 
into two traits.


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966518#comment-15966518
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110392501
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966525#comment-15966525
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111245186
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966535#comment-15966535
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110393126
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966519#comment-15966519
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110398565
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966526#comment-15966526
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110376285
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
 ---
@@ -36,14 +36,14 @@ import org.apache.flink.types.Row
 class DataStreamCorrelate(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
-inputNode: RelNode,
--- End diff --

Please revert these changes. We try to avoid reformatting changes because 
they make PRs harder to review. 


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966521#comment-15966521
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110392776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966515#comment-15966515
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110391516
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966529#comment-15966529
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111246143
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
--- End diff --

I wonder whether it would be better to extend `DataStreamRel` with a couple 
of methods that expose the retraction behavior of each operator (for example 
`requiresUpdatesAsRetractions()`, `forwardsRetractions()`, 
`consumesRetractions()`, `producesRetractions()`, or `producesUpdates()`).

I think hardcoding the 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966517#comment-15966517
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110390018
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966514#comment-15966514
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110388706
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
--- End diff --

Change return type to `Seq[RelNode]`


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966524#comment-15966524
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110388774
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
--- End diff --

method can be simplified to 
`topRel.getInputs.asScala.transform(_.asInstanceOf[HepRelVertex].getCurrentRel)`


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966530#comment-15966530
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110403637
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
--- End diff --

+space `if (`


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966520#comment-15966520
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110389978
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966516#comment-15966516
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110399631
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966527#comment-15966527
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110390397
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966522#comment-15966522
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110401135
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966532#comment-15966532
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110401619
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966513#comment-15966513
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110392466
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
+}
+  } else {
+false
+  }
+}
+
+/**
+  * Add 

[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966528#comment-15966528
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r111244797
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
--- End diff --

can be simplified to 
`null != retraction && retractionTrait.getNeedToRetract`


> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962053#comment-15962053
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110534557
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
--- End diff --

topRel.getInputs.asScala.map (
 case e: HepRelVertex => e.getCurrentRel
)



> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962052#comment-15962052
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3696#discussion_r110534493
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
+  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
+  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
+  * NeedToRetractProcessRule -> AccModeProcessRule).
+  */
+object DataStreamRetractionRule {
+
+  /**
+* Singleton rule that init retraction trait inside a [[DataStreamRel]]
+*/
+  val INIT_INSTANCE = new InitProcessRule()
+
+  /**
+* Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
+*/
+  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+  /**
+* Singleton rule that decide accMode inside a [[DataStreamRel]]
+*/
+  val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+  /**
+* Get all child RelNodes of a RelNode
+* @param topRel The input RelNode
+* @return All child nodes
+*/
+  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+val topRelInputs = new ListBuffer[RelNode]()
+topRelInputs.++=(topRel.getInputs.asScala)
+topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+  }
+
+  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+if (null == retractionTrait) {
+  false
+} else {
+  retractionTrait.getNeedToRetract
+}
+  }
+
+
+  /**
+* Find all needToRetract nodes. A node needs to retract means that 
there are downstream
+* nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
+* [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
+* needToRetract node also need retraction from it's upstream nodes.
+*/
+  class NeedToRetractProcessRule extends RelOptRule(
+operand(
+  classOf[DataStreamRel], none()),
+"NeedToRetractProcessRule") {
+
+/**
+  * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
+  * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
+  * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
+  * it's upstream nodes.
+  */
+def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
+  val bottomTraits = bottomRel.getTraitSet
+  if(!traitSetContainNeedToRetract(bottomTraits)){
+topRel match {
+  case _: DataStreamGroupAggregate => true
+  case _: DataStreamGroupWindowAggregate => true
+  case _: DataStreamOverAggregate => true
+  case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
+  case _ => false
--- End diff --

case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
case _ => false


[jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration

2017-04-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960585#comment-15960585
 ] 

ASF GitHub Bot commented on FLINK-6090:
---

GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/3696

[FLINK-6090] [table] Add RetractionRule at the stage of decoration

Add RetractionRules at the stage of decoration. These rules can derive 
NeedRetraction property and accumulating mode. There are three rules:
1.InitProcessRule. This rule inits NeedRetraction property and AccMode for 
DatastreamRels.
2.NeedToRetractProcessRule. This rule derives NeedRetraction property.
3.AccModeProcessRule.Find all AccRetract nodes. This rule derives 
accumulating mode.

- [x] General
  - The pull request references the related JIRA issue  [FLINK-6090] Add 
RetractionRule at the stage of decoration
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink FLINK-6090

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3696.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3696


commit c4edd908e2a251934ab6992dd1826a58a4d10b65
Author: hequn.chq 
Date:   2017-04-07T05:12:04Z

[FLINK-6090] [table] Add RetractionRule at the stage of decoration




> Add RetractionRule at the stage of decoration
> -
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)