[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r117183489
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
   "non-windowed GroupBy aggregation.")
 }
 
+val isCountWindow = window match {
+  case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => 
true
+  case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => 
true
+  case _ => false
+}
+
+if (isCountWindow && grouping.length > 0 && 
queryConfig.getMinIdleStateRetentionTime < 0) {
+  LOG.warn(
--- End diff --

I think this can be argued in both ways. If we log an error message could 
also throw an exception and prevent the program to be executed. However, there 
are also other strategies to deal with state cleanup, e.g., by configuring 
RocksDB.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r117183206
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate {
 case SlidingGroupWindow(_, timeField, size, slide)
 if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
   stream.countWindow(toLong(size), toLong(slide))
+  .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
--- End diff --

`slide` is correct here (check `KeyedStream.countWindow(long, long)`).

The default trigger is replaced by a trigger that additional registers a 
clean up timer. The trigger policy based on counts is still the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r117122323
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
   "non-windowed GroupBy aggregation.")
 }
 
+val isCountWindow = window match {
+  case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => 
true
+  case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => 
true
+  case _ => false
+}
+
+if (isCountWindow && grouping.length > 0 && 
queryConfig.getMinIdleStateRetentionTime < 0) {
+  LOG.warn(
--- End diff --

Should this be error() ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r117121382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate {
 case SlidingGroupWindow(_, timeField, size, slide)
 if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
   stream.countWindow(toLong(size), toLong(slide))
+  .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
--- End diff --

Should this be toLong(size) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3919


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116932844
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
 ---
@@ -19,44 +19,46 @@ package org.apache.flink.table.runtime.triggers
 
 import java.lang.{Long => JLong}
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.state._
 import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
 import org.apache.flink.streaming.api.windowing.triggers.{Trigger, 
TriggerResult}
-import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
-import 
org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
+import 
org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger.Sum
 
-class CountTriggerWithCleanupState[W <: Window](queryConfig: 
StreamQueryConfig, maxCount: Long)
-  extends Trigger[Any, W] {
+/**
+  * A {@link Trigger} that fires once the count of elements in a pane 
reaches the given count
+  * or the cleanup timer is triggered.
+  */
+@PublicEvolving
--- End diff --

We only annotate classes the core, DataSet and DataStream APIs with 
stability annotations.
`@PublicEvolving` should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116727685
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -245,6 +266,8 @@ object DataStreamGroupWindowAggregate {
 case SlidingGroupWindow(_, timeField, size, slide)
 if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
   stream.countWindow(toLong(size), toLong(slide))
+  .evictor(CountEvictor.of(toLong(size)))
--- End diff --

Do we need to override the evictor as well? 
I think it should not be changed if we add a custom trigger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116740788
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala
 ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.triggers
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.state._
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.{Trigger, 
TriggerResult}
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import 
org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
+
+class CountTriggerWithCleanupState[W <: Window](queryConfig: 
StreamQueryConfig, maxCount: Long)
+  extends Trigger[Any, W] {
+
+  private val serialVersionUID: Long = 1L
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val stateDesc: ReducingStateDescriptor[JLong] =
+new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG)
+
+  private val cleanupStateDesc: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong]("countCleanup", Types.LONG)
+
+  override def canMerge: Boolean = true
+
+  override def onMerge(window: W, ctx: Trigger.OnMergeContext) {
+ctx.mergePartitionedState(stateDesc)
+  }
+
+  override def toString: String = "CountTriggerWithCleanupState(" +
+"minIdleStateRetentionTime=" + 
queryConfig.getMinIdleStateRetentionTime + ", " +
+"maxIdleStateRetentionTime=" + 
queryConfig.getMaxIdleStateRetentionTime + ", " +
+"maxCount=" + maxCount + ")"
+
+  override def onElement(
+  element: Any,
+  timestamp: Long,
+  window: W,
+  ctx: TriggerContext): TriggerResult = {
+
+val currentTime = ctx.getCurrentProcessingTime
+
+// register cleanup timer
+if (stateCleaningEnabled) {
+  // last registered timer
+  val curCleanupTime = 
ctx.getPartitionedState(cleanupStateDesc).value()
+
+  // check if a cleanup timer is registered and
+  // that the current cleanup timer won't delete state we need to keep
+  if (curCleanupTime == null || (currentTime + minRetentionTime) > 
curCleanupTime) {
+// we need to register a new (later) timer
+val cleanupTime = currentTime + maxRetentionTime
+// register timer and remember clean-up time
+ctx.registerProcessingTimeTimer(cleanupTime)
+
+if (null != curCleanupTime) {
+  ctx.deleteProcessingTimeTimer(curCleanupTime)
+}
+
+ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime)
+  }
+}
+
+val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc)
+count.add(1L)
+
+if (count.get >= maxCount) {
+  count.clear()
+  return TriggerResult.FIRE
+}
+
+return TriggerResult.CONTINUE
--- End diff --

remove `return`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116738258
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import com.google.common.collect.Lists
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import 
org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class CountTriggerWithCleanupStateHarnessTest {
--- End diff --

rename to `StateCleaningCountTrigger`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116739074
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import com.google.common.collect.Lists
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import 
org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class CountTriggerWithCleanupStateHarnessTest {
+  protected var queryConfig =
+new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), 
Time.seconds(3))
+
+  @Test
+  def testFiringAndFireingWithPurging(): Unit = {
+val testHarness = new TriggerTestHarness[Any, TimeWindow](
+  CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new 
TimeWindow.Serializer)
--- End diff --

Count windows are based on `GlobalWindow`. Use a `GlobalWindow` instead of 
a `TimeWindow`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116739961
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import com.google.common.collect.Lists
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import 
org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class CountTriggerWithCleanupStateHarnessTest {
+  protected var queryConfig =
+new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), 
Time.seconds(3))
+
+  @Test
+  def testFiringAndFireingWithPurging(): Unit = {
+val testHarness = new TriggerTestHarness[Any, TimeWindow](
+  CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new 
TimeWindow.Serializer)
+
+// try to trigger onProcessingTime method via 1, but there is non 
timer is triggered
+assertEquals(0, testHarness.advanceProcessingTime(1).size())
+
+// register cleanup timer with 3001
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
--- End diff --

use same `StreamRecord` and `GlobalWindow` object to keep the code more 
concise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116733901
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala
 ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.triggers
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.state._
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.{Trigger, 
TriggerResult}
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import 
org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
+
+class CountTriggerWithCleanupState[W <: Window](queryConfig: 
StreamQueryConfig, maxCount: Long)
+  extends Trigger[Any, W] {
+
+  private val serialVersionUID: Long = 1L
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val stateDesc: ReducingStateDescriptor[JLong] =
+new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG)
+
+  private val cleanupStateDesc: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong]("countCleanup", Types.LONG)
+
+  override def canMerge: Boolean = true
+
+  override def onMerge(window: W, ctx: Trigger.OnMergeContext) {
+ctx.mergePartitionedState(stateDesc)
--- End diff --

This will fail because `ValueState` is not a `MergingState`. We could 
implement the `cleanupStateDesc` as a `ReducingState` with a `max` 
`ReduceFunction`. That would allow the Flink to merge the state.

On the other hand, we won't use the Trigger in merging windows anyways, to 
it would also be fine to not support merging at all (`canMerge = false`). I'd 
actually go for this option because it simplifies the trigger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116729795
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala
 ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.triggers
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.state._
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.{Trigger, 
TriggerResult}
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import 
org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
+
+class CountTriggerWithCleanupState[W <: Window](queryConfig: 
StreamQueryConfig, maxCount: Long)
--- End diff --

Rename to `StateCleaningCountTrigger`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116740271
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala
 ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.triggers
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.state._
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.{Trigger, 
TriggerResult}
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import 
org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
+
+class CountTriggerWithCleanupState[W <: Window](queryConfig: 
StreamQueryConfig, maxCount: Long)
--- End diff --

We can type the trigger to `GlobalWindow`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116740828
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala
 ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.triggers
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.state._
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.{Trigger, 
TriggerResult}
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import 
org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
+
+class CountTriggerWithCleanupState[W <: Window](queryConfig: 
StreamQueryConfig, maxCount: Long)
+  extends Trigger[Any, W] {
+
+  private val serialVersionUID: Long = 1L
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val stateDesc: ReducingStateDescriptor[JLong] =
+new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG)
+
+  private val cleanupStateDesc: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong]("countCleanup", Types.LONG)
+
+  override def canMerge: Boolean = true
+
+  override def onMerge(window: W, ctx: Trigger.OnMergeContext) {
+ctx.mergePartitionedState(stateDesc)
+  }
+
+  override def toString: String = "CountTriggerWithCleanupState(" +
+"minIdleStateRetentionTime=" + 
queryConfig.getMinIdleStateRetentionTime + ", " +
+"maxIdleStateRetentionTime=" + 
queryConfig.getMaxIdleStateRetentionTime + ", " +
+"maxCount=" + maxCount + ")"
+
+  override def onElement(
+  element: Any,
+  timestamp: Long,
+  window: W,
+  ctx: TriggerContext): TriggerResult = {
+
+val currentTime = ctx.getCurrentProcessingTime
+
+// register cleanup timer
+if (stateCleaningEnabled) {
+  // last registered timer
+  val curCleanupTime = 
ctx.getPartitionedState(cleanupStateDesc).value()
+
+  // check if a cleanup timer is registered and
+  // that the current cleanup timer won't delete state we need to keep
+  if (curCleanupTime == null || (currentTime + minRetentionTime) > 
curCleanupTime) {
+// we need to register a new (later) timer
+val cleanupTime = currentTime + maxRetentionTime
+// register timer and remember clean-up time
+ctx.registerProcessingTimeTimer(cleanupTime)
+
+if (null != curCleanupTime) {
+  ctx.deleteProcessingTimeTimer(curCleanupTime)
+}
+
+ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime)
+  }
+}
+
+val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc)
+count.add(1L)
+
+if (count.get >= maxCount) {
+  count.clear()
+  return TriggerResult.FIRE
+}
+
+return TriggerResult.CONTINUE
+  }
+
+  override def onProcessingTime(
+  time: Long,
+  window: W,
+  ctx: TriggerContext): TriggerResult = {
+
+if (stateCleaningEnabled) {
+  val cleanupTime = ctx.getPartitionedState(cleanupStateDesc).value()
+  // check that the triggered timer is the last registered processing 
time timer.
+  if (null != cleanupTime && time == cleanupTime) {
+clear(window, ctx)
+return TriggerResult.FIRE_AND_PURGE
+  }
+}
+return TriggerResult.CONTINUE
--- End diff --

   

[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116741416
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import com.google.common.collect.Lists
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import 
org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class CountTriggerWithCleanupStateHarnessTest {
+  protected var queryConfig =
+new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), 
Time.seconds(3))
+
+  @Test
+  def testFiringAndFireingWithPurging(): Unit = {
+val testHarness = new TriggerTestHarness[Any, TimeWindow](
+  CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new 
TimeWindow.Serializer)
+
+// try to trigger onProcessingTime method via 1, but there is non 
timer is triggered
+assertEquals(0, testHarness.advanceProcessingTime(1).size())
+
+// register cleanup timer with 3001
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// try to trigger onProcessingTime method via 1000, but there is non 
timer is triggered
+assertEquals(0, testHarness.advanceProcessingTime(1000).size())
+
+// 1000 + 2000 <= 3001 reuse timer 3001
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// there are two state entries, one is timer(3001) another is 
counter(2)
+assertEquals(2, testHarness.numStateEntries)
+
+// try to trigger onProcessingTime method via 3001, and timer(3001) is 
triggered
+assertEquals(
+  TriggerResult.FIRE_AND_PURGE,
+  testHarness.advanceProcessingTime(3001).iterator().next().f1)
+
+assertEquals(0, testHarness.numStateEntries)
+
+// 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove 
timer 3001
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// try to trigger onProcessingTime method via 4002, but there is non 
timer is triggered
+assertEquals(0, testHarness.advanceProcessingTime(4002).size())
+
+// 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove 
timer 6001
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// 4002 + 2000 <= 7002 reuse timer 7002
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// have one timer 7002
+assertEquals(1, testHarness.numProcessingTimeTimers)
+assertEquals(0, testHarness.numEventTimeTimers)
+assertEquals(2, testHarness.numStateEntries)
+assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9)))
+assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18)))
+
+// 4002 + 2000 <= 7002 reuse timer 7002
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// register cleanup timer via 7002 for window (9, 18)
+assertEquals(
+  

[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116727812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -296,6 +322,8 @@ object DataStreamGroupWindowAggregate {
 case SlidingGroupWindow(_, timeField, size, slide)
 if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
   stream.countWindowAll(toLong(size), toLong(slide))
+  .evictor(CountEvictor.of(toLong(size)))
--- End diff --

same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3919#discussion_r116740173
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import com.google.common.collect.Lists
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import 
org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class CountTriggerWithCleanupStateHarnessTest {
+  protected var queryConfig =
+new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), 
Time.seconds(3))
+
+  @Test
+  def testFiringAndFireingWithPurging(): Unit = {
+val testHarness = new TriggerTestHarness[Any, TimeWindow](
+  CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new 
TimeWindow.Serializer)
+
+// try to trigger onProcessingTime method via 1, but there is non 
timer is triggered
+assertEquals(0, testHarness.advanceProcessingTime(1).size())
+
+// register cleanup timer with 3001
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// try to trigger onProcessingTime method via 1000, but there is non 
timer is triggered
+assertEquals(0, testHarness.advanceProcessingTime(1000).size())
+
+// 1000 + 2000 <= 3001 reuse timer 3001
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// there are two state entries, one is timer(3001) another is 
counter(2)
+assertEquals(2, testHarness.numStateEntries)
+
+// try to trigger onProcessingTime method via 3001, and timer(3001) is 
triggered
+assertEquals(
+  TriggerResult.FIRE_AND_PURGE,
+  testHarness.advanceProcessingTime(3001).iterator().next().f1)
+
+assertEquals(0, testHarness.numStateEntries)
+
+// 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove 
timer 3001
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// try to trigger onProcessingTime method via 4002, but there is non 
timer is triggered
+assertEquals(0, testHarness.advanceProcessingTime(4002).size())
+
+// 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove 
timer 6001
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// 4002 + 2000 <= 7002 reuse timer 7002
+assertEquals(
+  TriggerResult.CONTINUE,
+  testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
+
+// have one timer 7002
+assertEquals(1, testHarness.numProcessingTimeTimers)
+assertEquals(0, testHarness.numEventTimeTimers)
+assertEquals(2, testHarness.numStateEntries)
+assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9)))
+assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18)))
--- End diff --

There is only a single global window. So, we do not need to test for 
different windows, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes 

[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...

2017-05-16 Thread sunjincheng121
GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3919

[FLINK-6583][talbe]Enable QueryConfig in count base GroupWindow

In this PR. Enabled the `QueryConfig` for count base `GroupWindow`.

- [x] General
  - The pull request references the related JIRA issue 
("[FLINK-6583][talbe]Enable QueryConfig in count base GroupWindow")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-6583-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3919.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3919


commit 8f54635006b8caa2770d552209e7ad7fe2475f96
Author: sunjincheng121 
Date:   2017-05-16T03:58:37Z

[FLINK-6583][talbe]Enable QueryConfig in count base GroupWindow




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---