[jira] [Commented] (FLINK-7648) Port TaskManagersHandler to new REST endpoint

2017-09-25 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180227#comment-16180227
 ] 

Bowen Li commented on FLINK-7648:
-

[~till.rohrmann] [~tzulitai] I'd like to try this and contribute to the new 
server architecture.

Given it's a handler for TaskManager, I guess I chose a pretty hard one... :) 
Can you please give me some guidance on this ticket, e.g.

- how to start and run the new web portal?
- what's the expected way to test the endpoint?
- more

Thanks!

> Port TaskManagersHandler to new REST endpoint
> -
>
> Key: FLINK-7648
> URL: https://issues.apache.org/jira/browse/FLINK-7648
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Bowen Li
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{TaskManagersHandler}} to the new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...

2017-09-25 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r140951770
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+import scala.collection.JavaConverters._
+
+class DataStreamJoinRule
+  extends ConverterRule(
+classOf[FlinkLogicalJoin],
+FlinkConventions.LOGICAL,
+FlinkConventions.DATASTREAM,
+"DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+val joinInfo = join.analyzeCondition
+
+val (windowBounds, remainingPreds) = 
WindowJoinUtil.extractWindowBoundsFromPredicate(
+  joinInfo.getRemaining(join.getCluster.getRexBuilder),
+  join.getLeft.getRowType.getFieldCount,
+  join.getRowType,
+  join.getCluster.getRexBuilder,
+  TableConfig.DEFAULT)
+
+// remaining predicate must not access time attributes
+val remainingPredsAccessTime = remainingPreds.isDefined &&
+  WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, 
join.getRowType)
+
+// Check that no event-time attributes are in the input.
+val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
+  .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+if (!windowBounds.isDefined && !remainingPredsAccessTime && 
!rowTimeAttrInOutput) {
--- End diff --

@fhueske, we actually agree quite a lot on the concern of infinite size you 
have raised. The same problem does not only exist in joining, but also in other 
cases, for example GROUPBY, where the grouping-key and associated state can be 
unlimited in terms of the size that the state of Flink can not hold them all. 
IMO, there is not an easy way to completely eliminate this just through the 
validation of query planner/optimizer, so I think it is not a good idea to only 
allow the unbounded-joining after a certain operators, like non-windowed 
aggregation (in fact, as mentioned above, the grouping-key of aggregation may 
also be infinite, so this does not ensure the finite state for joining 
operator). 
On the other hand, I think the finite state can only be ensured by the 
users by giving some hints/controls. We need instruct users to properly set 
those control knobs, such that their jobs will not run out of space. One hint 
we currently have is state ttl. (I think @hequn8128 has already added this for 
this unbounded joining). Maybe here we can add a check on state ttl to force 
users set a proper value. What do you think?
  


---


[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180188#comment-16180188
 ] 

ASF GitHub Bot commented on FLINK-6094:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/4471#discussion_r140951770
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.WindowJoinUtil
+import scala.collection.JavaConverters._
+
+class DataStreamJoinRule
+  extends ConverterRule(
+classOf[FlinkLogicalJoin],
+FlinkConventions.LOGICAL,
+FlinkConventions.DATASTREAM,
+"DataStreamJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
+val joinInfo = join.analyzeCondition
+
+val (windowBounds, remainingPreds) = 
WindowJoinUtil.extractWindowBoundsFromPredicate(
+  joinInfo.getRemaining(join.getCluster.getRexBuilder),
+  join.getLeft.getRowType.getFieldCount,
+  join.getRowType,
+  join.getCluster.getRexBuilder,
+  TableConfig.DEFAULT)
+
+// remaining predicate must not access time attributes
+val remainingPredsAccessTime = remainingPreds.isDefined &&
+  WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, 
join.getRowType)
+
+// Check that no event-time attributes are in the input.
+val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
+  .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+if (!windowBounds.isDefined && !remainingPredsAccessTime && 
!rowTimeAttrInOutput) {
--- End diff --

@fhueske, we actually agree quite a lot on the concern of infinite size you 
have raised. The same problem does not only exist in joining, but also in other 
cases, for example GROUPBY, where the grouping-key and associated state can be 
unlimited in terms of the size that the state of Flink can not hold them all. 
IMO, there is not an easy way to completely eliminate this just through the 
validation of query planner/optimizer, so I think it is not a good idea to only 
allow the unbounded-joining after a certain operators, like non-windowed 
aggregation (in fact, as mentioned above, the grouping-key of aggregation may 
also be infinite, so this does not ensure the finite state for joining 
operator). 
On the other hand, I think the finite state can only be ensured by the 
users by giving some hints/controls. We need instruct users to properly set 
those control knobs, such that their jobs will not run out of space. One hint 
we currently have is state ttl. (I think @hequn8128 has already added this for 
this unbounded joining). Maybe here we can add a check on state ttl to force 
users set a proper value. What do you think?
  


> Implement stream-stream proctime non-window  inner join
> ---
>
> Key: FLINK-6094
> URL: https://issues.apache.org/jira/browse/FLINK-6094
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn 

[jira] [Assigned] (FLINK-5176) Properly wait for component shutdown in Runner components

2017-09-25 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen reassigned FLINK-5176:
-

Assignee: Shuyi Chen

> Properly wait for component shutdown in Runner components
> -
>
> Key: FLINK-5176
> URL: https://issues.apache.org/jira/browse/FLINK-5176
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Minor
>  Labels: flip-6
>
> Currently, the different runners for the Flip-6 components {{JobMaster}}, 
> {{ResourceManager}} and {{TaskExecutor}} don't wait until the component has 
> been shut down in their own {{shutDown}} method. The problem is that the 
> {{AkkaRpcService}} implements the {{RpcService#shutDown}} method via a 
> {{PoisonPill}} which is not executed synchronously. Instead we should obtain 
> the {{RpcEndpoint#getTerminationFutures}} and wait on the returned future 
> (with a given timeout).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-25 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180077#comment-16180077
 ] 

Xingcan Cui commented on FLINK-7548:


Thanks for the answers, [~fhueske]. Some extra comments.

About the first question, we must admit that a rowtime field is dualistic. On 
one hand, it represents the rowtime and should be  taken as the {{Long}} type. 
On the other hand, it is a common field that gets its own type ({{Timestamp}} 
or {{Long}} and maybe more in the future?). We don't want to perform the extra 
type judgement when using this field as the rowtime field and also don't want 
to lose the original data type when using it as a common field (e.g., be passed 
to a UDF which formats a timestamp). Of course, if all the types are internally 
represented as {{Long}}, we just give the fields different time indicators so 
that we could recover the original data type after the processing.

The record number bounded out-of-order generation strategy is something like we 
don't emit a watermark {{w}} until a specified number of records whose 
timestamps are greater than {{w}} have reached. Just an idea that hits me :P

About my last question, I actually refer to the 
{{TimestampsAndPeriodicWatermarksOperator}}. Here, the "periodic" refers to 
proctime. Considering the time systems for the rowtime and the proctime may not 
be synchronized (i.e., they get different speeds), could we consider providing 
a "rowtime periodic" assigner?

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7648) Port TaskManagersHandler to new REST endpoint

2017-09-25 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7648:
---

Assignee: Bowen Li

> Port TaskManagersHandler to new REST endpoint
> -
>
> Key: FLINK-7648
> URL: https://issues.apache.org/jira/browse/FLINK-7648
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Bowen Li
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{TaskManagersHandler}} to the new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7688) ContaineredTaskManagerParameters sets off heap memory size incorrectly unlimited

2017-09-25 Thread Bill Liu (JIRA)
Bill Liu created FLINK-7688:
---

 Summary: ContaineredTaskManagerParameters sets off heap memory 
size incorrectly unlimited
 Key: FLINK-7688
 URL: https://issues.apache.org/jira/browse/FLINK-7688
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: Bill Liu


When taskmanager.memory.off-heap is disabled, 
the heap size is set to the jvm size,
which makes the off heap size to be -1 (unlimited).  As a result YARN 
occasionally kills the container.
{code:java}
final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : 
javaMemorySizeMB - heapSizeMB; 
{code}
It'e better set a limit for direct memory no matter off-heap enabled or not.






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-25 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179870#comment-16179870
 ] 

Fabian Hueske commented on FLINK-7657:
--

Hi [~kmurra], thanks for looking into this.
I'm currently traveling but [~twalthr] knows the SQL / Table API type system 
very well (better than I do) and might be able to help.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Assignee: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at 

[jira] [Created] (FLINK-7687) Clarify the master and slaves files are not necessary unless using the cluster start/stop scripts

2017-09-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7687:
-

 Summary: Clarify the master and slaves files are not necessary 
unless using the cluster start/stop scripts
 Key: FLINK-7687
 URL: https://issues.apache.org/jira/browse/FLINK-7687
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


It would be helpful if the documentation was clearer on the fact that the 
master/slaves config files are not needed when configured in high-availability 
mode unless you are using the provided scripts to start and shutdown the 
cluster over SSH.  If you are using some other mechanism to manage Flink 
instances (configuration management tools such as Chef or Ansible, or container 
management frameworks like Docker Compose or Kubernetes), these files are 
unnecessary.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179842#comment-16179842
 ] 

ASF GitHub Bot commented on FLINK-7683:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r140908693
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
+   @Override
+   public  Stream getKeys(String field, N namespace) {
+   Tuple2 columnInfo = 
kvStateInformation.get(field);
+   if (columnInfo == null) {
+   return Stream.empty();
+   }
+
+   RocksIterator iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   Iterator sourceIterator = new Iterator() {
+   @Override
+   public boolean hasNext() {
+   return iterator.isValid();
+   }
+
+   @Override
+   public K next() {
+   try {
+   byte[] key = iterator.key();
+
+   DataInputViewStreamWrapper dataInput = 
new DataInputViewStreamWrapper(
+   new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes));
+   K value = 
keySerializer.deserialize(dataInput);
+   iterator.next();
--- End diff --

This will throw exceptions implicitly if `iterator` doesn't have any more 
elements.

I'd rather have it fail early and faster by checking `iterator.hasNext()` 
first and throwing exceptions explicitly.


> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

2017-09-25 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r140908693
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
+   @Override
+   public  Stream getKeys(String field, N namespace) {
+   Tuple2 columnInfo = 
kvStateInformation.get(field);
+   if (columnInfo == null) {
+   return Stream.empty();
+   }
+
+   RocksIterator iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   Iterator sourceIterator = new Iterator() {
+   @Override
+   public boolean hasNext() {
+   return iterator.isValid();
+   }
+
+   @Override
+   public K next() {
+   try {
+   byte[] key = iterator.key();
+
+   DataInputViewStreamWrapper dataInput = 
new DataInputViewStreamWrapper(
+   new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes));
+   K value = 
keySerializer.deserialize(dataInput);
+   iterator.next();
--- End diff --

This will throw exceptions implicitly if `iterator` doesn't have any more 
elements.

I'd rather have it fail early and faster by checking `iterator.hasNext()` 
first and throwing exceptions explicitly.


---


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-25 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179836#comment-16179836
 ] 

Fabian Hueske commented on FLINK-7548:
--

Thanks for your thoughts [~xccui].
I added a few comments to your suggestions / questions.

Thanks, Fabian

bq. Considering that the data type should be preserved, it may bring extra 
logic if we do that internally. To keep the consistency, I wonder if it's 
possible to encapsulate the time into a new Rowtime type. It exposes two 
methods, getTime(): Long for logical level use and getValue(): T for physical 
level use.

In fact, {{Long}} and {{Timestamp}} have the same internal representation, 
namely {{Long}}. The issue is more the type that is exposed to SQL or the Table 
API. We would need a new TimeIndicator type that exposes a timestamp as 
{{Long}}.

bq. Besides, I think the watermark generation should not be bound with rowtime 
extraction. Compared with implementing them in a single scan operator (not sure 
if I understood correctly), I prefer to generate watermarks in extra operators. 
That should be more flexible.

Timestamp extraction and watermark generation would not be tight together. 
First, we would compute timestamps (only necessary if we don't use an existing 
field). The next step would extract watermarks. However, both operations would 
happen in the logical scan operator because a single operator can be translated 
into multiple DataStream operations.

bq. I am thinking of a new record number bounded out-of-order generation 
strategy. Do you think it will be useful in real applications?

How would this strategy work? IMO, built-in strategies should have a concrete 
use case in mind which is common enough to justify a built-in primitive.

bq. I still feel that the machine time is not compatible with the rowtime 
watermark generation. Shall we consider getting rid of it?

Machine time (assuming that you refer to processing time here) does not use 
watermarks. Watermarks are only used for event-time processing. 

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...

2017-09-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4688


---


[jira] [Closed] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7638.

   Resolution: Fixed
Fix Version/s: 1.4.0

Added via e585aed8ce751d769b56054fc1ffd4be24350e91

> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179840#comment-16179840
 ] 

ASF GitHub Bot commented on FLINK-7638:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4688


> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

2017-09-25 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r140907699
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
+   @Override
+   public  Stream getKeys(String field, N namespace) {
+   Tuple2 columnInfo = 
kvStateInformation.get(field);
+   if (columnInfo == null) {
+   return Stream.empty();
+   }
+
+   RocksIterator iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   Iterator sourceIterator = new Iterator() {
+   @Override
+   public boolean hasNext() {
--- End diff --

Is there a lock in RocksDB that will guard the iteration against state 
change by users?


---


[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179835#comment-16179835
 ] 

ASF GitHub Bot commented on FLINK-7683:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r140907699
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
+   @Override
+   public  Stream getKeys(String field, N namespace) {
+   Tuple2 columnInfo = 
kvStateInformation.get(field);
+   if (columnInfo == null) {
+   return Stream.empty();
+   }
+
+   RocksIterator iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   Iterator sourceIterator = new Iterator() {
+   @Override
+   public boolean hasNext() {
--- End diff --

Is there a lock in RocksDB that will guard the iteration against state 
change by users?


> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179832#comment-16179832
 ] 

ASF GitHub Bot commented on FLINK-7683:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r140907101
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
+   @Override
+   public  Stream getKeys(String field, N namespace) {
+   Tuple2 columnInfo = 
kvStateInformation.get(field);
+   if (columnInfo == null) {
+   return Stream.empty();
+   }
+
+   RocksIterator iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   Iterator sourceIterator = new Iterator() {
--- End diff --

I believe it's better to move impl of this iterator to a standalone or 
inner class


> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

2017-09-25 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r140907101
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
+   @Override
+   public  Stream getKeys(String field, N namespace) {
+   Tuple2 columnInfo = 
kvStateInformation.get(field);
+   if (columnInfo == null) {
+   return Stream.empty();
+   }
+
+   RocksIterator iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   Iterator sourceIterator = new Iterator() {
--- End diff --

I believe it's better to move impl of this iterator to a standalone or 
inner class


---


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-25 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4665
  
LGTM


---


[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-25 Thread David Anderson (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179607#comment-16179607
 ] 

David Anderson commented on FLINK-7683:
---

Something like this has also been requested by users managing state for very 
large numbers of keys, who would rather occasionally iterate over all their 
state and clear expired items, rather than having a timer per key. Will that 
become possible?

> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website

2017-09-25 Thread Hai Zhou_UTC+8 (JIRA)
Hai Zhou_UTC+8 created FLINK-7686:
-

 Summary: Add Flink Forward Berlin 2017 conference slides to the 
flink website
 Key: FLINK-7686
 URL: https://issues.apache.org/jira/browse/FLINK-7686
 Project: Flink
  Issue Type: Wish
Reporter: Hai Zhou_UTC+8
Priority: Trivial


I recently watched [Flink Forward Berlin 
2017|https://berlin.flink-forward.org/sessions/] conference slides, the content 
is very good.  
I think we should add them to the [flink 
website|http://flink.apache.org/community.html] for more people to know.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7645) Modify system-metrics part show in the document

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179486#comment-16179486
 ] 

ASF GitHub Bot commented on FLINK-7645:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/4693
  
Hey @yew1eb, I like this   I think it really makes sense to make this 
browsable from the table of contents. @zentol What do you think? I would be in 
favour of merging this if you don't have any objections.


> Modify system-metrics part show in the document
> ---
>
> Key: FLINK-7645
> URL: https://issues.apache.org/jira/browse/FLINK-7645
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Metrics
>Reporter: Hai Zhou_UTC+8
> Fix For: 1.4.0
>
>
> the system-metrics contents structure:
> Currently,
> {noformat}
> ├── System metrics
> ├── Latency tracking
> ├── Dashboard integration
> {noformat}
> I think the following is more reasonable,
> {noformat}
> ├── System metrics
>├── CPU
>├── Memory
>├── Threads
>├── GarbageCollection
>├── ClassLoader
>├── Network
>├── Cluster
>├── Availability
>├── Checkpointing
>├── IO
>└── Connectors
>   └──  Kafka Connectors
> ├── Latency tracking
> ├── Dashboard integration
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4693: [FLINK-7645][docs] Modify system-metrics part show in the...

2017-09-25 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4693
  
Hey @yew1eb, I like this 👍  I think it really makes sense to make this 
browsable from the table of contents. @zentol What do you think? I would be in 
favour of merging this if you don't have any objections.


---


[jira] [Commented] (FLINK-7393) Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179325#comment-16179325
 ] 

ASF GitHub Bot commented on FLINK-7393:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4708
  
@tzulitai yes, only unit tests migration


> Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to 
> KinesisConfigUtilTest
> ---
>
> Key: FLINK-7393
> URL: https://issues.apache.org/jira/browse/FLINK-7393
> Project: Flink
>  Issue Type: Test
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> Right now, 
> [{{FlinkKinesisConsumerTest}}|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java]
>  has lots of tests that actually should belong to {{KinesisConfigUtil}}, e.g. 
> all the {{validateXxxConfiguration()}}
> We need to move those tests out to a new file {{KinesisConfigUtilTest}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4708: [FLINK-7393][kinesis connector] Move unit tests that shou...

2017-09-25 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4708
  
@tzulitai yes, only unit tests migration


---


[GitHub] flink issue #4702: [FLINK-7635][DataStream API][Scala API] Support sideOutpu...

2017-09-25 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4702
  
Thank you, guys @aljoscha @chenqin !


---


[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179317#comment-16179317
 ] 

ASF GitHub Bot commented on FLINK-7635:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4702
  
Thank you, guys @aljoscha @chenqin !


> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179318#comment-16179318
 ] 

ASF GitHub Bot commented on FLINK-7635:
---

Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4702


> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...

2017-09-25 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4702


---


[GitHub] flink pull request #4707: [hotfix] Add doc for window operators, fix typos a...

2017-09-25 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4707


---


[jira] [Commented] (FLINK-5944) Flink should support reading Snappy Files

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179308#comment-16179308
 ] 

ASF GitHub Bot commented on FLINK-5944:
---

Github user mlipkovich commented on a diff in the pull request:

https://github.com/apache/flink/pull/4683#discussion_r140833786
  
--- Diff: flink-core/pom.xml ---
@@ -52,6 +52,12 @@ under the License.
flink-shaded-asm

 
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   
--- End diff --

Yes, it is a good point to make Hadoop Snappy a default codec. I think we 
still could support a Xerial Snappy since it comes for free. I will do these 
changes once we agree on dependencies

Regarding to separate module what would be the content of this model?  
As I understand a user which would like to read HDFS files will need 
flink-java module anyway since it contains Hadoop wrappers like 
HadoopInputSplit and so on. How do you think if it makes sense to put this 
Hadoop codec there?


> Flink should support reading Snappy Files
> -
>
> Key: FLINK-5944
> URL: https://issues.apache.org/jira/browse/FLINK-5944
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ilya Ganelin
>Assignee: Mikhail Lipkovich
>  Labels: features
>
> Snappy is an extremely performant compression format that's widely used 
> offering fast decompression/compression. 
> This can be easily implemented by creating a SnappyInflaterInputStreamFactory 
> and updating the initDefaultInflateInputStreamFactories in FileInputFormat.
> Flink already includes the Snappy dependency in the project. 
> There is a minor gotcha in this. If we wish to use this with Hadoop, then we 
> must provide two separate implementations since Hadoop uses a different 
> version of the snappy format than Snappy Java (which is the xerial/snappy 
> included in Flink). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...

2017-09-25 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140831859
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -281,6 +286,73 @@ public void testProducerFailedException() throws 
Exception {
ch.getNextBuffer();
}
 
+   /**
+* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
+* recycled to available buffers directly and it triggers notify of 
announced credit.
+*/
+   @Test
+   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+
+   // Recycle exclusive segment
+   
inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024,
 inputChannel));
+
+   assertEquals("There should have one available buffer after 
recycle.",
--- End diff --

`There should be one buffer available after recycle`


---


[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179305#comment-16179305
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140833417
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -281,6 +286,73 @@ public void testProducerFailedException() throws 
Exception {
ch.getNextBuffer();
}
 
+   /**
+* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
+* recycled to available buffers directly and it triggers notify of 
announced credit.
+*/
+   @Test
+   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+
+   // Recycle exclusive segment
+   
inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024,
 inputChannel));
+
+   assertEquals("There should have one available buffer after 
recycle.",
+   1, inputChannel.getNumberOfAvailableBuffers());
+   verify(inputChannel, times(1)).notifyCreditAvailable();
--- End diff --

can you add one more 
`inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024,
 inputChannel))` call and verify `inputChannel.getNumberOfAvailableBuffers()` 
and that `notifyCreditAvailable()` is not called again?


> Manage exclusive buffers in RemoteInputChannel
> --
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by 
> {{SingleInputGate}}.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179306#comment-16179306
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140831859
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -281,6 +286,73 @@ public void testProducerFailedException() throws 
Exception {
ch.getNextBuffer();
}
 
+   /**
+* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
+* recycled to available buffers directly and it triggers notify of 
announced credit.
+*/
+   @Test
+   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+
+   // Recycle exclusive segment
+   
inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024,
 inputChannel));
+
+   assertEquals("There should have one available buffer after 
recycle.",
--- End diff --

`There should be one buffer available after recycle`


> Manage exclusive buffers in RemoteInputChannel
> --
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by 
> {{SingleInputGate}}.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4683: [FLINK-5944] Support reading of Snappy files

2017-09-25 Thread mlipkovich
Github user mlipkovich commented on a diff in the pull request:

https://github.com/apache/flink/pull/4683#discussion_r140833786
  
--- Diff: flink-core/pom.xml ---
@@ -52,6 +52,12 @@ under the License.
flink-shaded-asm

 
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   
--- End diff --

Yes, it is a good point to make Hadoop Snappy a default codec. I think we 
still could support a Xerial Snappy since it comes for free. I will do these 
changes once we agree on dependencies

Regarding to separate module what would be the content of this model?  
As I understand a user which would like to read HDFS files will need 
flink-java module anyway since it contains Hadoop wrappers like 
HadoopInputSplit and so on. How do you think if it makes sense to put this 
Hadoop codec there?


---


[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...

2017-09-25 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140833417
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -281,6 +286,73 @@ public void testProducerFailedException() throws 
Exception {
ch.getNextBuffer();
}
 
+   /**
+* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
+* recycled to available buffers directly and it triggers notify of 
announced credit.
+*/
+   @Test
+   public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
+   final SingleInputGate inputGate = mock(SingleInputGate.class);
+   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+
+   // Recycle exclusive segment
+   
inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024,
 inputChannel));
+
+   assertEquals("There should have one available buffer after 
recycle.",
+   1, inputChannel.getNumberOfAvailableBuffers());
+   verify(inputChannel, times(1)).notifyCreditAvailable();
--- End diff --

can you add one more 
`inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024,
 inputChannel))` call and verify `inputChannel.getNumberOfAvailableBuffers()` 
and that `notifyCreditAvailable()` is not called again?


---


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179304#comment-16179304
 ] 

ASF GitHub Bot commented on FLINK-7465:
---

Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140833539
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Java implementation of HyperLogLog (HLL) algorithm from this paper:
+ * 
+ * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
+ * 
+ * HLL is an improved version of LogLog that is capable of estimating
+ * the cardinality of a set with accuracy = 1.04/sqrt(m) where
+ * m = 2^b.  So we can control accuracy vs space usage by increasing
+ * or decreasing b.
+ * 
+ * The main benefit of using HLL over LL is that it only requires 64%
+ * of the space that LL does to get the same accuracy.
+ * 
+ * 
+ * Note that this implementation does not include the long range 
correction function
+ * defined in the original paper.  Empirical evidence shows that the 
correction
+ * function causes more harm than good.
+ * 
+ */
+public class HyperLogLog implements ICardinality, Serializable {
+
+   private final RegisterSet registerSet;
+   private final int log2m;
+   private final double alphaMM;
+
+
+   /**
+* Create a new HyperLogLog instance using the specified standard 
deviation.
+*
+* @param rsd - the relative standard deviation for the counter.
+*smaller values create counters that require more space.
+*/
+   public HyperLogLog(double rsd) {
+   this(log2m(rsd));
+   }
+
+   private static int log2m(double rsd) {
+   return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / 
Math.log(2));
+   }
+
+   private static double rsd(int log2m) {
+   return 1.106 / Math.sqrt(Math.exp(log2m * Math.log(2)));
+   }
+
+   private static void validateLog2m(int log2m) {
+   if (log2m < 0 || log2m > 30) {
+   throw new IllegalArgumentException("log2m argument is "
+   + log2m + " and is outside the range 
[0, 30]");
+   }
+   }
+
+   private static double linearCounting(int m, double v) {
+   return m * Math.log(m / v);
+   }
+
+   /**
+* Create a new HyperLogLog instance.  The log2m parameter defines the 
accuracy of
+* the counter.  The larger the log2m the better the accuracy.
+* 
+* accuracy = 1.04/sqrt(2^log2m)
+*
+* @param log2m - the number of bits to use as the basis for the HLL 
instance
+*/
+   public HyperLogLog(int log2m) {
+   this(log2m, new RegisterSet(1 << log2m));
+   }
+
+   /**
+* Creates a new HyperLogLog instance using the given registers.  Used 
for unmarshalling a serialized
+* instance and for merging multiple counters together.
+*
+* @param registerSet - the initial values for the register set
+*/
+   public HyperLogLog(int log2m, RegisterSet registerSet) {
+   validateLog2m(log2m);
+   this.registerSet = registerSet;
+   this.log2m = log2m;
+   int 

[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...

2017-09-25 Thread jparkie
Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140833539
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Java implementation of HyperLogLog (HLL) algorithm from this paper:
+ * 
+ * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
+ * 
+ * HLL is an improved version of LogLog that is capable of estimating
+ * the cardinality of a set with accuracy = 1.04/sqrt(m) where
+ * m = 2^b.  So we can control accuracy vs space usage by increasing
+ * or decreasing b.
+ * 
+ * The main benefit of using HLL over LL is that it only requires 64%
+ * of the space that LL does to get the same accuracy.
+ * 
+ * 
+ * Note that this implementation does not include the long range 
correction function
+ * defined in the original paper.  Empirical evidence shows that the 
correction
+ * function causes more harm than good.
+ * 
+ */
+public class HyperLogLog implements ICardinality, Serializable {
+
+   private final RegisterSet registerSet;
+   private final int log2m;
+   private final double alphaMM;
+
+
+   /**
+* Create a new HyperLogLog instance using the specified standard 
deviation.
+*
+* @param rsd - the relative standard deviation for the counter.
+*smaller values create counters that require more space.
+*/
+   public HyperLogLog(double rsd) {
+   this(log2m(rsd));
+   }
+
+   private static int log2m(double rsd) {
+   return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / 
Math.log(2));
+   }
+
+   private static double rsd(int log2m) {
+   return 1.106 / Math.sqrt(Math.exp(log2m * Math.log(2)));
+   }
+
+   private static void validateLog2m(int log2m) {
+   if (log2m < 0 || log2m > 30) {
+   throw new IllegalArgumentException("log2m argument is "
+   + log2m + " and is outside the range 
[0, 30]");
+   }
+   }
+
+   private static double linearCounting(int m, double v) {
+   return m * Math.log(m / v);
+   }
+
+   /**
+* Create a new HyperLogLog instance.  The log2m parameter defines the 
accuracy of
+* the counter.  The larger the log2m the better the accuracy.
+* 
+* accuracy = 1.04/sqrt(2^log2m)
+*
+* @param log2m - the number of bits to use as the basis for the HLL 
instance
+*/
+   public HyperLogLog(int log2m) {
+   this(log2m, new RegisterSet(1 << log2m));
+   }
+
+   /**
+* Creates a new HyperLogLog instance using the given registers.  Used 
for unmarshalling a serialized
+* instance and for merging multiple counters together.
+*
+* @param registerSet - the initial values for the register set
+*/
+   public HyperLogLog(int log2m, RegisterSet registerSet) {
+   validateLog2m(log2m);
+   this.registerSet = registerSet;
+   this.log2m = log2m;
+   int m = 1 << this.log2m;
+
+   alphaMM = getAlphaMM(log2m, m);
+   }
+
+   @Override
+   public boolean offerHashed(long hashedValue) {
+   // j becomes the binary address determined by the first b 

[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179299#comment-16179299
 ] 

ASF GitHub Bot commented on FLINK-7465:
---

Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140832701
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Java implementation of HyperLogLog (HLL) algorithm from this paper:
+ * 
+ * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
+ * 
+ * HLL is an improved version of LogLog that is capable of estimating
+ * the cardinality of a set with accuracy = 1.04/sqrt(m) where
+ * m = 2^b.  So we can control accuracy vs space usage by increasing
+ * or decreasing b.
+ * 
+ * The main benefit of using HLL over LL is that it only requires 64%
+ * of the space that LL does to get the same accuracy.
+ * 
+ * 
+ * Note that this implementation does not include the long range 
correction function
+ * defined in the original paper.  Empirical evidence shows that the 
correction
+ * function causes more harm than good.
+ * 
+ */
+public class HyperLogLog implements ICardinality, Serializable {
--- End diff --

I see this class is adapted from https://github.com/addthis/stream-lib. I 
think you should comment that this class was adapted from the link, so people 
can track differences.


> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> 

[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...

2017-09-25 Thread jparkie
Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140832701
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Java implementation of HyperLogLog (HLL) algorithm from this paper:
+ * 
+ * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
+ * 
+ * HLL is an improved version of LogLog that is capable of estimating
+ * the cardinality of a set with accuracy = 1.04/sqrt(m) where
+ * m = 2^b.  So we can control accuracy vs space usage by increasing
+ * or decreasing b.
+ * 
+ * The main benefit of using HLL over LL is that it only requires 64%
+ * of the space that LL does to get the same accuracy.
+ * 
+ * 
+ * Note that this implementation does not include the long range 
correction function
+ * defined in the original paper.  Empirical evidence shows that the 
correction
+ * function causes more harm than good.
+ * 
+ */
+public class HyperLogLog implements ICardinality, Serializable {
--- End diff --

I see this class is adapted from https://github.com/addthis/stream-lib. I 
think you should comment that this class was adapted from the link, so people 
can track differences.


---


[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179290#comment-16179290
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140831302
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -183,18 +214,40 @@ void notifySubpartitionConsumed() {
}
 
/**
-* Releases all received buffers and closes the partition request 
client.
+* Releases all received and available buffers, closes the partition 
request client.
 */
@Override
void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
+
+   final List recyclingSegments = new 
ArrayList<>();
+
synchronized (receivedBuffers) {
Buffer buffer;
while ((buffer = receivedBuffers.poll()) != 
null) {
-   buffer.recycle();
+   if (buffer.getRecycler() == this) {
+   
recyclingSegments.add(buffer.getMemorySegment());
--- End diff --

ok - regarding `RemoteInputChannel#recycle()`, if we use this here 
(indirectly via `buffer.recycle`), we would not want it to redistribute buffers 
for every single `recycle()` call, but if `RemoteInputChannel#recycle()` is 
called for a single buffer outside `RemoteInputChannel#releaseAllResources()`, 
we would want it to do that...

You're right that this would create some mess there and we should probably 
keep your current solution unless something better comes to mind.


> Manage exclusive buffers in RemoteInputChannel
> --
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by 
> {{SingleInputGate}}.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179289#comment-16179289
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140829015
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -215,6 +269,48 @@ public String toString() {
}
 
// 

+   // Credit-based
+   // 

+
+   /**
+* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+*/
+   void notifyCreditAvailable() {
+   //TODO in next PR
+   }
+
+   /**
+* Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
+* credit to producer.
+*
+* @param segment The exclusive segment of this channel.
+*/
+   @Override
+   public void recycle(MemorySegment segment) {
+   synchronized (availableBuffers) {
+   if (isReleased.get()) {
+   try {
+   
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+   return;
+   } catch (Throwable t) {
+   ExceptionUtils.rethrow(t);
+   }
+   }
--- End diff --

Can you maybe add a comment on the importance of the `isReleased` check 
being inside the synchronized block (as implemented by `onBuffer` before, but 
also without a comment)? This is related to the `AtomicBoolean` field and not 
getting into races here.


> Manage exclusive buffers in RemoteInputChannel
> --
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by 
> {{SingleInputGate}}.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...

2017-09-25 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140831302
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -183,18 +214,40 @@ void notifySubpartitionConsumed() {
}
 
/**
-* Releases all received buffers and closes the partition request 
client.
+* Releases all received and available buffers, closes the partition 
request client.
 */
@Override
void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
+
+   final List recyclingSegments = new 
ArrayList<>();
+
synchronized (receivedBuffers) {
Buffer buffer;
while ((buffer = receivedBuffers.poll()) != 
null) {
-   buffer.recycle();
+   if (buffer.getRecycler() == this) {
+   
recyclingSegments.add(buffer.getMemorySegment());
--- End diff --

ok - regarding `RemoteInputChannel#recycle()`, if we use this here 
(indirectly via `buffer.recycle`), we would not want it to redistribute buffers 
for every single `recycle()` call, but if `RemoteInputChannel#recycle()` is 
called for a single buffer outside `RemoteInputChannel#releaseAllResources()`, 
we would want it to do that...

You're right that this would create some mess there and we should probably 
keep your current solution unless something better comes to mind.


---


[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...

2017-09-25 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140829015
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -215,6 +269,48 @@ public String toString() {
}
 
// 

+   // Credit-based
+   // 

+
+   /**
+* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+*/
+   void notifyCreditAvailable() {
+   //TODO in next PR
+   }
+
+   /**
+* Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
+* credit to producer.
+*
+* @param segment The exclusive segment of this channel.
+*/
+   @Override
+   public void recycle(MemorySegment segment) {
+   synchronized (availableBuffers) {
+   if (isReleased.get()) {
+   try {
+   
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+   return;
+   } catch (Throwable t) {
+   ExceptionUtils.rethrow(t);
+   }
+   }
--- End diff --

Can you maybe add a comment on the importance of the `isReleased` check 
being inside the synchronized block (as implemented by `onBuffer` before, but 
also without a comment)? This is related to the `AtomicBoolean` field and not 
getting into races here.


---


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179284#comment-16179284
 ] 

ASF GitHub Bot commented on FLINK-7465:
---

Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140830364
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/ICardinality.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+import java.io.IOException;
+
+/**
+ * An interface definition for implementation of cardinality.
+ */
+public interface ICardinality {
--- End diff --

Why is the interface named `ICardinlaity`? Is this common in the Flink 
codebase? I know prefixing "I" is common in C#.


> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...

2017-09-25 Thread jparkie
Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140830364
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/ICardinality.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+import java.io.IOException;
+
+/**
+ * An interface definition for implementation of cardinality.
+ */
+public interface ICardinality {
--- End diff --

Why is the interface named `ICardinlaity`? Is this common in the Flink 
codebase? I know prefixing "I" is common in C#.


---


[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...

2017-09-25 Thread jparkie
Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140828795
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/ICardinality.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+import java.io.IOException;
+
+/**
+ * An interface definition for implementation of cardinality.
+ */
+public interface ICardinality {
+
+   /**
+* Check whether the element is impact estimate.
+*
+* @param o stream element
+* @return false if the value returned by cardinality() is unaffected 
by the appearance of o in the stream.
+*/
+   boolean offer(Object o);
+
+   /**
+* Offer the value as a hashed long value.
+*
+* @param hashedLong - the hash of the item to offer to the estimator
+* @return false if the value returned by cardinality() is unaffected 
by the appearance of hashedLong in the stream
+*/
+   boolean offerHashed(long hashedLong);
+
+   /**
+* Offer the value as a hashed long value.
+*
+* @param hashedInt - the hash of the item to offer to the estimator
+* @return false if the value returned by cardinality() is unaffected 
by the appearance of hashedInt in the stream
+*/
+   boolean offerHashed(int hashedInt);
+
+   /**
+* @return the number of unique elements in the stream or an estimate 
thereof.
+*/
+   long cardinality();
+
+   /**
+* @return size in bytes needed for serialization.
+*/
+   int sizeof();
+
+   /**
+* Get the byte array used for the calculation.
+*
+* @return The byte array used for the calculation
+* @throws IOException
+*/
+   byte[] getBytes() throws IOException;
+
+   /**
+* Merges estimators to produce a new estimator for the combined streams
+* of this estimator and those passed as arguments.
+* 
+* Nor this estimator nor the one passed as parameters are modified.
+*
+* @param estimators Zero or more compatible estimators
+* @throws Exception If at least one of the estimators is not 
compatible with this one
+*/
+   ICardinality merge(ICardinality... estimators) throws Exception;
--- End diff --

Wouldn't it be nicer to have a more specific Exception?


---


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179276#comment-16179276
 ] 

ASF GitHub Bot commented on FLINK-7465:
---

Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140828795
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/ICardinality.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+import java.io.IOException;
+
+/**
+ * An interface definition for implementation of cardinality.
+ */
+public interface ICardinality {
+
+   /**
+* Check whether the element is impact estimate.
+*
+* @param o stream element
+* @return false if the value returned by cardinality() is unaffected 
by the appearance of o in the stream.
+*/
+   boolean offer(Object o);
+
+   /**
+* Offer the value as a hashed long value.
+*
+* @param hashedLong - the hash of the item to offer to the estimator
+* @return false if the value returned by cardinality() is unaffected 
by the appearance of hashedLong in the stream
+*/
+   boolean offerHashed(long hashedLong);
+
+   /**
+* Offer the value as a hashed long value.
+*
+* @param hashedInt - the hash of the item to offer to the estimator
+* @return false if the value returned by cardinality() is unaffected 
by the appearance of hashedInt in the stream
+*/
+   boolean offerHashed(int hashedInt);
+
+   /**
+* @return the number of unique elements in the stream or an estimate 
thereof.
+*/
+   long cardinality();
+
+   /**
+* @return size in bytes needed for serialization.
+*/
+   int sizeof();
+
+   /**
+* Get the byte array used for the calculation.
+*
+* @return The byte array used for the calculation
+* @throws IOException
+*/
+   byte[] getBytes() throws IOException;
+
+   /**
+* Merges estimators to produce a new estimator for the combined streams
+* of this estimator and those passed as arguments.
+* 
+* Nor this estimator nor the one passed as parameters are modified.
+*
+* @param estimators Zero or more compatible estimators
+* @throws Exception If at least one of the estimators is not 
compatible with this one
+*/
+   ICardinality merge(ICardinality... estimators) throws Exception;
--- End diff --

Wouldn't it be nicer to have a more specific Exception?


> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. 

[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179259#comment-16179259
 ] 

ASF GitHub Bot commented on FLINK-7465:
---

Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140826158
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/MurmurHash.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+/**
+ * This is a very fast, non-cryptographic hash suitable for general 
hash-based
+ * lookup. See http://murmurhash.googlepages.com/ for more details.
+ * 
+ */
+public class MurmurHash {
--- End diff --

What was the decision in choosing MurmurHash2 (in 
https://github.com/tnm/murmurhash-java) versus MurmurHash3 (in 
https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Murmur3_32HashFunction.java)?

If I recall correctly, MurmurHash3 is more collision resistant and slightly 
faster than MurmurHash2.


> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...

2017-09-25 Thread jparkie
Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140826158
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/MurmurHash.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggfunctions.cardinality;
+
+/**
+ * This is a very fast, non-cryptographic hash suitable for general 
hash-based
+ * lookup. See http://murmurhash.googlepages.com/ for more details.
+ * 
+ */
+public class MurmurHash {
--- End diff --

What was the decision in choosing MurmurHash2 (in 
https://github.com/tnm/murmurhash-java) versus MurmurHash3 (in 
https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Murmur3_32HashFunction.java)?

If I recall correctly, MurmurHash3 is more collision resistant and slightly 
faster than MurmurHash2.


---


[jira] [Commented] (FLINK-6465) support FIRST_VALUE on Table API & SQL

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179245#comment-16179245
 ] 

ASF GitHub Bot commented on FLINK-6465:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4556#discussion_r140824914
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1406,6 +1406,61 @@ object AggregateUtil {
   }
 }
   }
+case sqlFirstLastValueAggFunction: SqlFirstLastValueAggFunction =>
+
+  aggregates(index) = if (sqlFirstLastValueAggFunction.getKind == 
SqlKind.FIRST_VALUE) {
+if (needRetraction) {
+  sqlTypeName match {
--- End diff --

Convert it to a map


> support FIRST_VALUE on Table API & SQL
> --
>
> Key: FLINK-6465
> URL: https://issues.apache.org/jira/browse/FLINK-6465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: sunjincheng
>
> {{FIRST_VALUE}} is a OVER WINDOW function. In this JIRA. will add 
> {{FIRST_VALUE}} function support on TableAPI & SQL.
> *Syntax:*
> FIRST_VALUE ( [scalar_expression ] )   
> OVER ( [ partition_by_clause ] order_by_clause [ rows_range_clause ] )  
> [About OVER 
> WINDOWS|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/tableApi.html#over-windows]
> scalar_expression
> Is the value to be returned. scalar_expression can be a column, or other 
> arbitrary expression that results in a single value. Other analytic functions 
> are not permitted.
> *NOTE:*
> *  {{FIRST_VALUE}} if used for OVER WINDOW, e.g.: 
> {code}
> SELECT A, B, FIRST_VALUE(C) OVER (ORDER BY E) AS firstValue FROM tab
> {code}
> * OVER WINDOW's retraction is expensive(currently not supported yet), and 
> this JIRA. does not implement Retract logic of {{FIRST_VALUE}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4720: Release 1.3.2 rc3

2017-09-25 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4720
  
This looks pretty much like some sort of mistake. Can you please close the 
PR?


---


[jira] [Commented] (FLINK-6465) support FIRST_VALUE on Table API & SQL

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179246#comment-16179246
 ] 

ASF GitHub Bot commented on FLINK-6465:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4556#discussion_r140824819
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/FirstValueAggFunctionWithRetract.scala
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+import org.apache.flink.table.functions.AggregateFunction
+
+/** The initial accumulator for first value with retraction aggregate 
function */
+class FirstValueWithRetractAccumulator[T] {
+  var data: ListView[T] = _
+  var retractedData: MapView[T, Byte] = _
+
+  override def equals(obj: scala.Any): Boolean = {
+if (obj.isInstanceOf[FirstValueWithRetractAccumulator[T]]) {
+  val target = obj.asInstanceOf[FirstValueWithRetractAccumulator[T]]
+  if (target.data.equals(this.data) && 
target.retractedData.equals(this.retractedData)) {
+return true
+  }
+}
+false
+  }
+}
+
+/**
+  * Base class for built-in first value with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class FirstValueWithRetractAggFunction[T]
+  extends AggregateFunction[T, FirstValueWithRetractAccumulator[T]] {
+
+  override def createAccumulator(): FirstValueWithRetractAccumulator[T] = {
+val acc = new FirstValueWithRetractAccumulator[T]
+acc.data = new ListView[T]
+acc.retractedData = new MapView[T, Byte]
+acc
+  }
+
+  def accumulate(acc: FirstValueWithRetractAccumulator[T], value: Any): 
Unit = {
+if (null != value) {
+  val v = value.asInstanceOf[T]
+  acc.data.add(v)
+}
+  }
+
+  def retract(acc: FirstValueWithRetractAccumulator[T], value: Any): Unit 
= {
--- End diff --

It seems that we can do better than having two copies of state here.

It might make sense to extend the ListView interface or to introduce a 
TreeMap like view.


> support FIRST_VALUE on Table API & SQL
> --
>
> Key: FLINK-6465
> URL: https://issues.apache.org/jira/browse/FLINK-6465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: sunjincheng
>
> {{FIRST_VALUE}} is a OVER WINDOW function. In this JIRA. will add 
> {{FIRST_VALUE}} function support on TableAPI & SQL.
> *Syntax:*
> FIRST_VALUE ( [scalar_expression ] )   
> OVER ( [ partition_by_clause ] order_by_clause [ rows_range_clause ] )  
> [About OVER 
> WINDOWS|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/tableApi.html#over-windows]
> scalar_expression
> Is the value to be returned. scalar_expression can be a column, or other 
> arbitrary expression that results in a single value. Other analytic functions 
> are not permitted.
> *NOTE:*
> *  {{FIRST_VALUE}} if used for OVER WINDOW, e.g.: 
> {code}
> SELECT A, B, FIRST_VALUE(C) OVER (ORDER BY E) AS firstValue FROM tab
> {code}
> * OVER WINDOW's retraction is expensive(currently not supported yet), and 
> this JIRA. does not implement Retract logic of {{FIRST_VALUE}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6465) support FIRST_VALUE on Table API & SQL

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179244#comment-16179244
 ] 

ASF GitHub Bot commented on FLINK-6465:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4556#discussion_r140823719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/FirstValueAggFunction.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+  * The initial accumulator for first value aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+class FirstValueAccumulator[T] extends JTuple1[T]
+
+abstract class FirstValueAggFunction[T] extends AggregateFunction[T, 
FirstValueAccumulator[T]] {
+
+  override def createAccumulator(): FirstValueAccumulator[T] = {
+val acc = new FirstValueAccumulator[T]
+acc.f0 = getInitValue
+acc
+  }
+
+  def accumulate(acc: FirstValueAccumulator[T], value: Any): Unit = {
+if (null == acc.f0) {
+  acc.f0 = value.asInstanceOf[T]
+}
+  }
+
+  override def getValue(acc: FirstValueAccumulator[T]): T = {
+acc.f0
+  }
+
+  def resetAccumulator(acc: FirstValueAccumulator[T]): Unit = {
+acc.f0 = getInitValue
+  }
+
+  def getInitValue: T = {
+null.asInstanceOf[T]
+  }
+
+  def getValueTypeInfo: TypeInformation[T]
--- End diff --

Looks like the parameter can be a generic parameter


> support FIRST_VALUE on Table API & SQL
> --
>
> Key: FLINK-6465
> URL: https://issues.apache.org/jira/browse/FLINK-6465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: sunjincheng
>
> {{FIRST_VALUE}} is a OVER WINDOW function. In this JIRA. will add 
> {{FIRST_VALUE}} function support on TableAPI & SQL.
> *Syntax:*
> FIRST_VALUE ( [scalar_expression ] )   
> OVER ( [ partition_by_clause ] order_by_clause [ rows_range_clause ] )  
> [About OVER 
> WINDOWS|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/tableApi.html#over-windows]
> scalar_expression
> Is the value to be returned. scalar_expression can be a column, or other 
> arbitrary expression that results in a single value. Other analytic functions 
> are not permitted.
> *NOTE:*
> *  {{FIRST_VALUE}} if used for OVER WINDOW, e.g.: 
> {code}
> SELECT A, B, FIRST_VALUE(C) OVER (ORDER BY E) AS firstValue FROM tab
> {code}
> * OVER WINDOW's retraction is expensive(currently not supported yet), and 
> this JIRA. does not implement Retract logic of {{FIRST_VALUE}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4556: [FLINK-6465][table]support FIRST_VALUE on Table AP...

2017-09-25 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4556#discussion_r140824914
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1406,6 +1406,61 @@ object AggregateUtil {
   }
 }
   }
+case sqlFirstLastValueAggFunction: SqlFirstLastValueAggFunction =>
+
+  aggregates(index) = if (sqlFirstLastValueAggFunction.getKind == 
SqlKind.FIRST_VALUE) {
+if (needRetraction) {
+  sqlTypeName match {
--- End diff --

Convert it to a map


---


[GitHub] flink pull request #4556: [FLINK-6465][table]support FIRST_VALUE on Table AP...

2017-09-25 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4556#discussion_r140824819
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/FirstValueAggFunctionWithRetract.scala
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+import org.apache.flink.table.functions.AggregateFunction
+
+/** The initial accumulator for first value with retraction aggregate 
function */
+class FirstValueWithRetractAccumulator[T] {
+  var data: ListView[T] = _
+  var retractedData: MapView[T, Byte] = _
+
+  override def equals(obj: scala.Any): Boolean = {
+if (obj.isInstanceOf[FirstValueWithRetractAccumulator[T]]) {
+  val target = obj.asInstanceOf[FirstValueWithRetractAccumulator[T]]
+  if (target.data.equals(this.data) && 
target.retractedData.equals(this.retractedData)) {
+return true
+  }
+}
+false
+  }
+}
+
+/**
+  * Base class for built-in first value with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class FirstValueWithRetractAggFunction[T]
+  extends AggregateFunction[T, FirstValueWithRetractAccumulator[T]] {
+
+  override def createAccumulator(): FirstValueWithRetractAccumulator[T] = {
+val acc = new FirstValueWithRetractAccumulator[T]
+acc.data = new ListView[T]
+acc.retractedData = new MapView[T, Byte]
+acc
+  }
+
+  def accumulate(acc: FirstValueWithRetractAccumulator[T], value: Any): 
Unit = {
+if (null != value) {
+  val v = value.asInstanceOf[T]
+  acc.data.add(v)
+}
+  }
+
+  def retract(acc: FirstValueWithRetractAccumulator[T], value: Any): Unit 
= {
--- End diff --

It seems that we can do better than having two copies of state here.

It might make sense to extend the ListView interface or to introduce a 
TreeMap like view.


---


[GitHub] flink pull request #4556: [FLINK-6465][table]support FIRST_VALUE on Table AP...

2017-09-25 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4556#discussion_r140823719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/FirstValueAggFunction.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+  * The initial accumulator for first value aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+class FirstValueAccumulator[T] extends JTuple1[T]
+
+abstract class FirstValueAggFunction[T] extends AggregateFunction[T, 
FirstValueAccumulator[T]] {
+
+  override def createAccumulator(): FirstValueAccumulator[T] = {
+val acc = new FirstValueAccumulator[T]
+acc.f0 = getInitValue
+acc
+  }
+
+  def accumulate(acc: FirstValueAccumulator[T], value: Any): Unit = {
+if (null == acc.f0) {
+  acc.f0 = value.asInstanceOf[T]
+}
+  }
+
+  override def getValue(acc: FirstValueAccumulator[T]): T = {
+acc.f0
+  }
+
+  def resetAccumulator(acc: FirstValueAccumulator[T]): Unit = {
+acc.f0 = getInitValue
+  }
+
+  def getInitValue: T = {
+null.asInstanceOf[T]
+  }
+
+  def getValueTypeInfo: TypeInformation[T]
--- End diff --

Looks like the parameter can be a generic parameter


---


[jira] [Commented] (FLINK-7541) Redistribute operator state using OperatorID

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179241#comment-16179241
 ] 

ASF GitHub Bot commented on FLINK-7541:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4609
  
Merged in f1b2b83d63. Please close the PR and the JIRA.


> Redistribute operator state using OperatorID
> 
>
> Key: FLINK-7541
> URL: https://issues.apache.org/jira/browse/FLINK-7541
> Project: Flink
>  Issue Type: Improvement
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Currently StateAssignmentOperation relays heavily on the order of new and old 
> operators in the task. It should be changed and it should relay more on 
> OperatorID.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4609: [FLINK-7541] Refactor StateAssignmentOperation

2017-09-25 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4609
  
Merged in f1b2b83d63. Please close the PR and the JIRA.


---


[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...

2017-09-25 Thread jparkie
Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140823213
  
--- Diff: docs/dev/table/sql.md ---
@@ -2020,7 +2020,16 @@ COUNT(*)
 Returns the number of input rows.
   
 
-
+
+  
+{% highlight text %}
+CARDINALITY_COUNT(rsd, value)
--- End diff --

Would it be clearer to the user to have the function have the word 
"approximate" in it such that the user understands the count is an estimate? I 
see Apache Spark calls it 
`approx_count_distinct`(https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/functions.html#approx_count_distinct-org.apache.spark.sql.Column-double-)
 and Redshift has it as `APPROXIMATE COUNT(DISTINCT column)` 
(http://docs.aws.amazon.com/redshift/latest/dg/r_COUNT.html).


---


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179237#comment-16179237
 ] 

ASF GitHub Bot commented on FLINK-7465:
---

Github user jparkie commented on a diff in the pull request:

https://github.com/apache/flink/pull/4652#discussion_r140823213
  
--- Diff: docs/dev/table/sql.md ---
@@ -2020,7 +2020,16 @@ COUNT(*)
 Returns the number of input rows.
   
 
-
+
+  
+{% highlight text %}
+CARDINALITY_COUNT(rsd, value)
--- End diff --

Would it be clearer to the user to have the function have the word 
"approximate" in it such that the user understands the count is an estimate? I 
see Apache Spark calls it 
`approx_count_distinct`(https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/functions.html#approx_count_distinct-org.apache.spark.sql.Column-double-)
 and Redshift has it as `APPROXIMATE COUNT(DISTINCT column)` 
(http://docs.aws.amazon.com/redshift/latest/dg/r_COUNT.html).


> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179234#comment-16179234
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140822508
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -183,18 +214,40 @@ void notifySubpartitionConsumed() {
}
 
/**
-* Releases all received buffers and closes the partition request 
client.
+* Releases all received and available buffers, closes the partition 
request client.
 */
@Override
void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
+
+   final List recyclingSegments = new 
ArrayList<>();
+
synchronized (receivedBuffers) {
Buffer buffer;
while ((buffer = receivedBuffers.poll()) != 
null) {
-   buffer.recycle();
+   if (buffer.getRecycler() == this) {
+   
recyclingSegments.add(buffer.getMemorySegment());
--- End diff --

oh, now I see - but let me elaborate on the `RemoteInputChannel#recycle()` 
there


> Manage exclusive buffers in RemoteInputChannel
> --
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by 
> {{SingleInputGate}}.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...

2017-09-25 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140822508
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -183,18 +214,40 @@ void notifySubpartitionConsumed() {
}
 
/**
-* Releases all received buffers and closes the partition request 
client.
+* Releases all received and available buffers, closes the partition 
request client.
 */
@Override
void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
+
+   final List recyclingSegments = new 
ArrayList<>();
+
synchronized (receivedBuffers) {
Buffer buffer;
while ((buffer = receivedBuffers.poll()) != 
null) {
-   buffer.recycle();
+   if (buffer.getRecycler() == this) {
+   
recyclingSegments.add(buffer.getMemorySegment());
--- End diff --

oh, now I see - but let me elaborate on the `RemoteInputChannel#recycle()` 
there


---


[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179228#comment-16179228
 ] 

ASF GitHub Bot commented on FLINK-7683:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r140821446
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 ---
@@ -287,6 +289,14 @@ public S get(K key, N namespace) {
}
 
@Override
+   public Stream getKeys(N namespace) {
+   return Arrays.stream(primaryTable)
--- End diff --

This hash table applies incremental rehashing. This method must also 
consider the data stored ``incrementalRehashTable`` if it isn't ``null`` or we 
can miss some data.


> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179223#comment-16179223
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140810853
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -99,8 +114,24 @@ public RemoteInputChannel(
this.connectionManager = checkNotNull(connectionManager);
}
 
+   /**
+* Assigns exclusive buffers to this input channel, and this method 
should be called only once
+* after this input channel is created.
+*/
void assignExclusiveSegments(List segments) {
-   // TODO in next PR
+   checkState(this.initialCredit == 0, "Bug in input channel setup 
logic: exclusive buffers have" +
+   "already been set for this input channel.");
--- End diff --

please add a space between `have` and `already` (between the concatenations)


> Manage exclusive buffers in RemoteInputChannel
> --
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by 
> {{SingleInputGate}}.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179224#comment-16179224
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140821090
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -183,18 +214,40 @@ void notifySubpartitionConsumed() {
}
 
/**
-* Releases all received buffers and closes the partition request 
client.
+* Releases all received and available buffers, closes the partition 
request client.
 */
@Override
void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
+
+   final List recyclingSegments = new 
ArrayList<>();
+
synchronized (receivedBuffers) {
Buffer buffer;
while ((buffer = receivedBuffers.poll()) != 
null) {
-   buffer.recycle();
+   if (buffer.getRecycler() == this) {
+   
recyclingSegments.add(buffer.getMemorySegment());
--- End diff --

1) I think, performance is not much of an issue here, as this is only 
called during take-down of a connection and the overhead of `Buffer#recycle` is 
actually not that much.
2) Sorry, but I don't get your concern. Why would you need an extra check 
when using `buffer.recycle()` instead of 
`exclusiveRecyclingSegments.add(buffer.getMemorySegment())`? There shouldn't be 
anything special for the exclusive buffers in this regard compared to ordinary 
buffers (which is the beauty of the design).

Let me give the example of how `LocalBufferPool` handles this inside 
`lazyDestroy`: it returns every memory segment (one by one) with 
`networkBufferPool.recycle()` and, at its end, it is calling 
`networkBufferPool.destroyBufferPool()` so that the book-keeping inside the 
`NetworkBufferPool` is updated and buffers may be re-distributed to other 
`LocalBufferPool` instances.
We could do this similarly here: recycle one by one, then call some method 
to update book-keeping and balancing inside `NetworkBufferPool`.


> Manage exclusive buffers in RemoteInputChannel
> --
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by 
> {{SingleInputGate}}.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179225#comment-16179225
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140811033
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -100,7 +122,16 @@ public RemoteInputChannel(
}
 
void assignExclusiveSegments(List segments) {
--- End diff --

works for me :)


> Manage exclusive buffers in RemoteInputChannel
> --
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by 
> {{SingleInputGate}}.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

2017-09-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r140821446
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 ---
@@ -287,6 +289,14 @@ public S get(K key, N namespace) {
}
 
@Override
+   public Stream getKeys(N namespace) {
+   return Arrays.stream(primaryTable)
--- End diff --

This hash table applies incremental rehashing. This method must also 
consider the data stored ``incrementalRehashTable`` if it isn't ``null`` or we 
can miss some data.


---


[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179226#comment-16179226
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140809272
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -70,6 +78,15 @@
 */
private int expectedSequenceNumber = 0;
 
+   /** The initial number of exclusive buffers assigned to this channel. */
+   private int initialCredit;
+
+   /** The current available buffers including both exclusive buffers and 
requested floating buffers. */
+   private final ArrayDeque availableBuffers = new ArrayDeque<>();
+
+   /** The number of available buffers that have not been unannounced to 
producer yet. */
--- End diff --

actually, there were some more typos here - the full comment should be 
something like this `The number of available buffers that have not been 
announced to the producer yet.`


> Manage exclusive buffers in RemoteInputChannel
> --
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by 
> {{SingleInputGate}}.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...

2017-09-25 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140811033
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -100,7 +122,16 @@ public RemoteInputChannel(
}
 
void assignExclusiveSegments(List segments) {
--- End diff --

works for me :)


---


[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...

2017-09-25 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140809272
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -70,6 +78,15 @@
 */
private int expectedSequenceNumber = 0;
 
+   /** The initial number of exclusive buffers assigned to this channel. */
+   private int initialCredit;
+
+   /** The current available buffers including both exclusive buffers and 
requested floating buffers. */
+   private final ArrayDeque availableBuffers = new ArrayDeque<>();
+
+   /** The number of available buffers that have not been unannounced to 
producer yet. */
--- End diff --

actually, there were some more typos here - the full comment should be 
something like this `The number of available buffers that have not been 
announced to the producer yet.`


---


[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...

2017-09-25 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140810853
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -99,8 +114,24 @@ public RemoteInputChannel(
this.connectionManager = checkNotNull(connectionManager);
}
 
+   /**
+* Assigns exclusive buffers to this input channel, and this method 
should be called only once
+* after this input channel is created.
+*/
void assignExclusiveSegments(List segments) {
-   // TODO in next PR
+   checkState(this.initialCredit == 0, "Bug in input channel setup 
logic: exclusive buffers have" +
+   "already been set for this input channel.");
--- End diff --

please add a space between `have` and `already` (between the concatenations)


---


[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...

2017-09-25 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r140821090
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -183,18 +214,40 @@ void notifySubpartitionConsumed() {
}
 
/**
-* Releases all received buffers and closes the partition request 
client.
+* Releases all received and available buffers, closes the partition 
request client.
 */
@Override
void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
+
+   final List recyclingSegments = new 
ArrayList<>();
+
synchronized (receivedBuffers) {
Buffer buffer;
while ((buffer = receivedBuffers.poll()) != 
null) {
-   buffer.recycle();
+   if (buffer.getRecycler() == this) {
+   
recyclingSegments.add(buffer.getMemorySegment());
--- End diff --

1) I think, performance is not much of an issue here, as this is only 
called during take-down of a connection and the overhead of `Buffer#recycle` is 
actually not that much.
2) Sorry, but I don't get your concern. Why would you need an extra check 
when using `buffer.recycle()` instead of 
`exclusiveRecyclingSegments.add(buffer.getMemorySegment())`? There shouldn't be 
anything special for the exclusive buffers in this regard compared to ordinary 
buffers (which is the beauty of the design).

Let me give the example of how `LocalBufferPool` handles this inside 
`lazyDestroy`: it returns every memory segment (one by one) with 
`networkBufferPool.recycle()` and, at its end, it is calling 
`networkBufferPool.destroyBufferPool()` so that the book-keeping inside the 
`NetworkBufferPool` is updated and buffers may be re-distributed to other 
`LocalBufferPool` instances.
We could do this similarly here: recycle one by one, then call some method 
to update book-keeping and balancing inside `NetworkBufferPool`.


---


[jira] [Closed] (FLINK-7619) Improve abstraction in AbstractAsyncIOCallable to better fit

2017-09-25 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-7619.
-
   Resolution: Fixed
Fix Version/s: 1.4.0

Merged in 5af463a9c0.

> Improve abstraction in AbstractAsyncIOCallable to better fit
> 
>
> Key: FLINK-7619
> URL: https://issues.apache.org/jira/browse/FLINK-7619
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
> Fix For: 1.4.0
>
>
> The abstraction of AbstractAsyncIOCallable does not fit to well with todays 
> needs in checkpointing backends. Originally, backends were assumed to only 
> open one stream that is managed by the abstraction. In fact, concrete 
> implementations always extended that in practise. We can redo this in a way 
> that more resources can be managed by the abstraction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7619) Improve abstraction in AbstractAsyncIOCallable to better fit

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179200#comment-16179200
 ] 

ASF GitHub Bot commented on FLINK-7619:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/4671


> Improve abstraction in AbstractAsyncIOCallable to better fit
> 
>
> Key: FLINK-7619
> URL: https://issues.apache.org/jira/browse/FLINK-7619
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> The abstraction of AbstractAsyncIOCallable does not fit to well with todays 
> needs in checkpointing backends. Originally, backends were assumed to only 
> open one stream that is managed by the abstraction. In fact, concrete 
> implementations always extended that in practise. We can redo this in a way 
> that more resources can be managed by the abstraction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7619) Improve abstraction in AbstractAsyncIOCallable to better fit

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179199#comment-16179199
 ] 

ASF GitHub Bot commented on FLINK-7619:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4671
  
Merged in 5af463a9c0.


> Improve abstraction in AbstractAsyncIOCallable to better fit
> 
>
> Key: FLINK-7619
> URL: https://issues.apache.org/jira/browse/FLINK-7619
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> The abstraction of AbstractAsyncIOCallable does not fit to well with todays 
> needs in checkpointing backends. Originally, backends were assumed to only 
> open one stream that is managed by the abstraction. In fact, concrete 
> implementations always extended that in practise. We can redo this in a way 
> that more resources can be managed by the abstraction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4671: [FLINK-7619] Improve abstraction in AbstractAsyncI...

2017-09-25 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/4671


---


[GitHub] flink issue #4671: [FLINK-7619] Improve abstraction in AbstractAsyncIOCallab...

2017-09-25 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4671
  
Merged in 5af463a9c0.


---


[jira] [Closed] (FLINK-7524) Task "xxx" did not react to cancelling signal, but is stuck in method

2017-09-25 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-7524.
-
Assignee: Stefan Richter

I still found some small potential for improvement that is merged in 0073204b25.

> Task "xxx" did not react to cancelling signal, but is stuck in method
> -
>
> Key: FLINK-7524
> URL: https://issues.apache.org/jira/browse/FLINK-7524
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Hi,
> I observed the following errors in taskmanager.log 
> {code:java}
> 2017-08-25 17:03:40,141 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'TriggerWindow(SlidingEventTimeWindows(25920, 
> 360), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in 
> method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> java.lang.Thread.run(Thread.java:748)
> ...
> 2017-08-25 17:05:10,139 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Notifying TaskManager about fatal error. Task 
> 'TriggerWindow(SlidingEventTimeWindows(25920, 360), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 
> seconds, but is stuck in method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> 

[jira] [Commented] (FLINK-7524) Task "xxx" did not react to cancelling signal, but is stuck in method

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179194#comment-16179194
 ] 

ASF GitHub Bot commented on FLINK-7524:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/4639


> Task "xxx" did not react to cancelling signal, but is stuck in method
> -
>
> Key: FLINK-7524
> URL: https://issues.apache.org/jira/browse/FLINK-7524
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Hi,
> I observed the following errors in taskmanager.log 
> {code:java}
> 2017-08-25 17:03:40,141 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'TriggerWindow(SlidingEventTimeWindows(25920, 
> 360), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in 
> method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> java.lang.Thread.run(Thread.java:748)
> ...
> 2017-08-25 17:05:10,139 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Notifying TaskManager about fatal error. Task 
> 'TriggerWindow(SlidingEventTimeWindows(25920, 360), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 
> seconds, but is stuck in method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> 

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

2017-09-25 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/4639


---


[jira] [Commented] (FLINK-7684) Avoid multiple data copies in MergingWindowSet

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179162#comment-16179162
 ] 

ASF GitHub Bot commented on FLINK-7684:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4723

[FLINK-7684] Avoid data copies in MergingWindowSet

## What is the purpose of the change

Previously MergingWindowSet uses ListState of tuples to persists it's 
mapping. This is inefficient because this ListState of tuples must be converted 
to a HashMap on each access.

Furthermore, for some cases it might be inefficient to check whether 
mapping has changed before saving it on state.

Fixing those two issues improve session windows 
[benchmarks](https://github.com/dataArtisans/flink-benchmarks) results by 10 - 
20%

First commit comes from different PR #4722 

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: **yes** (it changes how WindowOperator is being 
serialized)
  - The runtime per-record code paths (performance sensitive): **yes**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink window

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4723.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4723


commit ac41b26cdb5c2341bb25e520c4d449ec4e956a8f
Author: Piotr Nowojski 
Date:   2017-09-14T10:39:30Z

[FLINK-7683] Iterate over keys in KeyedStateBackend

commit 20ed7f51399752d620696d9502b61233d47f6bcb
Author: Piotr Nowojski 
Date:   2017-09-25T12:18:52Z

[FLINK-7684] Add OptimizationTarget to the ExecutionConfig

commit c5505e1252ea5b70cff9cf76ff89d7dc52f45057
Author: Piotr Nowojski 
Date:   2017-07-06T11:56:13Z

[FLINK-7684] Serialize MergingWindowSet to ValueState>

This avoids an unnecessary data copy

commit 00c044c30e95087b65abe65da67a77841a3c7740
Author: Piotr Nowojski 
Date:   2017-09-25T13:28:01Z

[FLINK-7684] Add always persist flag to MegringWindowSet




> Avoid multiple data copies in MergingWindowSet
> --
>
> Key: FLINK-7684
> URL: https://issues.apache.org/jira/browse/FLINK-7684
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1, 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Currently MergingWindowSet uses ListState of tuples to persists it's mapping. 
> This is inefficient because this ListState of tuples must be converted to a 
> HashMap on each access.
> Furthermore, for some cases it might be inefficient to check whether mapping 
> has changed before saving it on state.
> Those two issues are causing multiple data copies and constructing multiple 
> Lists/Maps per each processed element, which is a reason for noticeable 
> performance issues.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4723: [FLINK-7684] Avoid data copies in MergingWindowSet

2017-09-25 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4723

[FLINK-7684] Avoid data copies in MergingWindowSet

## What is the purpose of the change

Previously MergingWindowSet uses ListState of tuples to persists it's 
mapping. This is inefficient because this ListState of tuples must be converted 
to a HashMap on each access.

Furthermore, for some cases it might be inefficient to check whether 
mapping has changed before saving it on state.

Fixing those two issues improve session windows 
[benchmarks](https://github.com/dataArtisans/flink-benchmarks) results by 10 - 
20%

First commit comes from different PR #4722 

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: **yes** (it changes how WindowOperator is being 
serialized)
  - The runtime per-record code paths (performance sensitive): **yes**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink window

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4723.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4723


commit ac41b26cdb5c2341bb25e520c4d449ec4e956a8f
Author: Piotr Nowojski 
Date:   2017-09-14T10:39:30Z

[FLINK-7683] Iterate over keys in KeyedStateBackend

commit 20ed7f51399752d620696d9502b61233d47f6bcb
Author: Piotr Nowojski 
Date:   2017-09-25T12:18:52Z

[FLINK-7684] Add OptimizationTarget to the ExecutionConfig

commit c5505e1252ea5b70cff9cf76ff89d7dc52f45057
Author: Piotr Nowojski 
Date:   2017-07-06T11:56:13Z

[FLINK-7684] Serialize MergingWindowSet to ValueState>

This avoids an unnecessary data copy

commit 00c044c30e95087b65abe65da67a77841a3c7740
Author: Piotr Nowojski 
Date:   2017-09-25T13:28:01Z

[FLINK-7684] Add always persist flag to MegringWindowSet




---


[jira] [Updated] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-25 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-7683:
--
Description: This is required to make possible preserving backward 
compatibility while changing state definition of a keyed state operator (to do 
so operator must iterate over all of the existing keys and rewrites them into a 
new state variable).  (was: This is required to make possible preserving 
backward compatibility while changing state definition of a keyed state 
operator.)

> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179116#comment-16179116
 ] 

ASF GitHub Bot commented on FLINK-7683:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4722

[FLINK-7683] Iterate over keys in KeyedStateBackend

## What is the purpose of the change

This is required to make possible preserving backward compatibility while 
changing state definition of a keyed state operator.

## Verifying this change

This change added new unit tests to cover new feature.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink keys

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4722.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4722


commit ac41b26cdb5c2341bb25e520c4d449ec4e956a8f
Author: Piotr Nowojski 
Date:   2017-09-14T10:39:30Z

[FLINK-7683] Iterate over keys in KeyedStateBackend




> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

2017-09-25 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4722

[FLINK-7683] Iterate over keys in KeyedStateBackend

## What is the purpose of the change

This is required to make possible preserving backward compatibility while 
changing state definition of a keyed state operator.

## Verifying this change

This change added new unit tests to cover new feature.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink keys

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4722.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4722


commit ac41b26cdb5c2341bb25e520c4d449ec4e956a8f
Author: Piotr Nowojski 
Date:   2017-09-14T10:39:30Z

[FLINK-7683] Iterate over keys in KeyedStateBackend




---


[jira] [Updated] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-25 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-7683:
--
Description: This is required to make possible preserving backward 
compatibility while changing state definition of a keyed state operator.  (was: 
This is required to make possible backward compatibility while changing state 
definition of a keyed state operator.)

> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-25 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179089#comment-16179089
 ] 

Kostas Kloudas commented on FLINK-7606:
---

Perfect! Thanks a lot!

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7685) CompilerException: "Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly"

2017-09-25 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7685:
--

 Summary: CompilerException: "Bug: Logic for branching plans 
(non-tree plans) has an error, and does not track the re-joining of branches 
correctly"
 Key: FLINK-7685
 URL: https://issues.apache.org/jira/browse/FLINK-7685
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Reporter: Gabor Gevay
Priority: Minor


A Flink program which reads an input DataSet, creates 64 new DataSets from it, 
and writes these to separate files throws the following exception:

{code:java}
Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
Logic for branching plans (non-tree plans) has an error, and does not track the 
re-joining of branches correctly.
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:491)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:921)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:86)
{code}

Here is some code that reproduces it:
https://github.com/ggevay/flink/tree/compiler-exception-new

Note that it works with less than 64 DataSets.

Also note that with more than 64 DataSets it throws {{CompilerException: Cannot 
currently handle nodes with more than 64 outputs}}, which is at least a clear 
error msg that helps the user to find a workaround.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7684) Avoid multiple data copies in MergingWindowSet

2017-09-25 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-7684:
--
Issue Type: Improvement  (was: New Feature)

> Avoid multiple data copies in MergingWindowSet
> --
>
> Key: FLINK-7684
> URL: https://issues.apache.org/jira/browse/FLINK-7684
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1, 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Currently MergingWindowSet uses ListState of tuples to persists it's mapping. 
> This is inefficient because this ListState of tuples must be converted to a 
> HashMap on each access.
> Furthermore, for some cases it might be inefficient to check whether mapping 
> has changed before saving it on state.
> Those two issues are causing multiple data copies and constructing multiple 
> Lists/Maps per each processed element, which is a reason for noticeable 
> performance issues.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7684) Avoid multiple data copies in MergingWindowSet

2017-09-25 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-7684:
-

 Assignee: Piotr Nowojski
Affects Version/s: 1.2.0
   1.3.0
   1.2.1
   1.3.1
   1.3.2
  Component/s: Streaming

> Avoid multiple data copies in MergingWindowSet
> --
>
> Key: FLINK-7684
> URL: https://issues.apache.org/jira/browse/FLINK-7684
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1, 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Currently MergingWindowSet uses ListState of tuples to persists it's mapping. 
> This is inefficient because this ListState of tuples must be converted to a 
> HashMap on each access.
> Furthermore, for some cases it might be inefficient to check whether mapping 
> has changed before saving it on state.
> Those two issues are causing multiple data copies and constructing multiple 
> Lists/Maps per each processed element, which is a reason for noticeable 
> performance issues.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179072#comment-16179072
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4710
  
@haohui We only cast ROWTIME / PROCTIME directly to LONG during runtime, 
the special types are needed during pre-flight phase and validation. We could 
not come up with a better solution that ensures that watermarks stay aligned 
with the rowtime.


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7684) Avoid multiple data copies in MergingWindowSet

2017-09-25 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7684:
-

 Summary: Avoid multiple data copies in MergingWindowSet
 Key: FLINK-7684
 URL: https://issues.apache.org/jira/browse/FLINK-7684
 Project: Flink
  Issue Type: New Feature
Reporter: Piotr Nowojski


Currently MergingWindowSet uses ListState of tuples to persists it's mapping. 
This is inefficient because this ListState of tuples must be converted to a 
HashMap on each access.

Furthermore, for some cases it might be inefficient to check whether mapping 
has changed before saving it on state.

Those two issues are causing multiple data copies and constructing multiple 
Lists/Maps per each processed element, which is a reason for noticeable 
performance issues.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4710: [FLINK-7446] [table] Change DefinedRowtimeAttribute to wo...

2017-09-25 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4710
  
@haohui We only cast ROWTIME / PROCTIME directly to LONG during runtime, 
the special types are needed during pre-flight phase and validation. We could 
not come up with a better solution that ensures that watermarks stay aligned 
with the rowtime.


---


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179063#comment-16179063
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140774888
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -255,11 +255,30 @@ abstract class CodeGenerator(
 generateRowtimeAccess()
   case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
 // attribute is proctime indicator.
-// We use a null literal and generate a timestamp when we need it.
+// we use a null literal and generate a timestamp when we need it.
 generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
   case idx =>
-// regular attribute. Access attribute in input data type.
-generateInputAccess(input1, input1Term, idx)
+// get type of result field
+val outIdx = input1Mapping.indexOf(idx)
+val outType = returnType match {
+  case pt: PojoTypeInfo[_] => 
pt.getTypeAt(resultFieldNames(outIdx))
+  case ct: CompositeType[_] => ct.getTypeAt(outIdx)
+  case t: TypeInformation[_] => t
+}
+val inputAccess = generateInputAccess(input1, input1Term, idx)
+// Change output type to rowtime indicator
+if (FlinkTypeFactory.isRowtimeIndicatorType(outType) &&
+  (inputAccess.resultType == Types.LONG || inputAccess.resultType 
== Types.SQL_TIMESTAMP)) {
+  // Hard cast possible because LONG, TIMESTAMP, and 
ROW_TIMEINDICATOR are internally
--- End diff --

`ROWTIME_INDICATOR`


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179065#comment-16179065
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140778116
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
 ---
@@ -30,10 +30,16 @@ package org.apache.flink.table.sources
 trait DefinedRowtimeAttribute {
 
   /**
-* Defines a name of the event-time attribute that represents Flink's
-* event-time. Null if no rowtime should be available.
+* Defines a name of the event-time attribute that represents Flink's 
event-time, i.e., an
+* attribute that is aligned with the watermarks of the table.
--- End diff --

`that is aligned with the watermarks of the underlying DataStream`?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179070#comment-16179070
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140778313
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
 ---
@@ -30,10 +30,16 @@ package org.apache.flink.table.sources
 trait DefinedRowtimeAttribute {
 
   /**
-* Defines a name of the event-time attribute that represents Flink's
--- End diff --

Maybe also update the docs of the class?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179071#comment-16179071
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140774786
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -255,11 +255,30 @@ abstract class CodeGenerator(
 generateRowtimeAccess()
   case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
 // attribute is proctime indicator.
-// We use a null literal and generate a timestamp when we need it.
+// we use a null literal and generate a timestamp when we need it.
 generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
   case idx =>
-// regular attribute. Access attribute in input data type.
-generateInputAccess(input1, input1Term, idx)
+// get type of result field
--- End diff --

add a comment that this is needed for `TableSource`?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179066#comment-16179066
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140771682
  
--- Diff: docs/dev/table/streaming.md ---
@@ -336,27 +336,24 @@ val windowedTable = tEnv
 
 ### Event time
 
-Event time allows a table program to produce results based on the time 
that is contained in every record. This allows for consistent results even in 
case of out-of-order events or late events. It also ensures replayable results 
of the table program when reading records from persistent storage. 
+Event time allows a table program to produce results based on the time 
that is contained in every record. This allows for consistent results even in 
case of out-of-order events or late events. It also ensures replayable results 
of the table program when reading records from persistent storage.
 
 Additionally, event time allows for unified syntax for table programs in 
both batch and streaming environments. A time attribute in a streaming 
environment can be a regular field of a record in a batch environment.
 
 In order to handle out-of-order events and distinguish between on-time and 
late events in streaming, Flink needs to extract timestamps from events and 
make some kind of progress in time (so-called [watermarks]({{ site.baseurl 
}}/dev/event_time.html)).
 
 An event time attribute can be defined either during DataStream-to-Table 
conversion or by using a TableSource. 
 
-The Table API & SQL assume that in both cases timestamps and watermarks 
have been generated in the [underlying DataStream API]({{ site.baseurl 
}}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a 
`TableSource` with knowledge about the incoming data's characteristics and is 
hidden from the end user of the API.
-
-
  During DataStream-to-Table Conversion
 
-The event time attribute is defined with the `.rowtime` property during 
schema definition. 
+The event time attribute is defined with the `.rowtime` property during 
schema definition. Timestamps and watermarks must have been assigned in the 
`DataStream` that is converted.
--- End diff --

Add a link to `dev/event_timestamps_watermarks` again?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >