[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-08-28 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144836#comment-16144836
 ] 

Xingcan Cui commented on FLINK-7548:


Thanks for the issue [~jark]. I'd like to share some ideas about the problem.
1. In principle, each rowtime field should be "guarded" with a set of 
watermarks. Although we support multiple rowtime fields now, once two streams 
are connected, their watermarks will be forcibly merged. As a consequence, the 
initial watermarks may not be used in the following calculations. Shall we 
consider re-generating them?
2. The current periodic watermark assigner is based on machine time. I'm not 
sure if it is applicable for rowtime since the rowtime and machine time may not 
be synchronized. For example, if the stream is sourced from a historical queue, 
it may feed into the system at a maximum speed, thus the machine time based 
watermark assigner may not work properly (e.g., we may generate a watermark 
with 1 hour rowtime span in 5 seconds). How about using a rowtime based 
periodic assigner with the following framework?
{code:java}
class WatermarksAssigner(interval: Long) extends 
AssignerWithPunctuatedWatermarks[Order] {
  var currentWatermark: Long = 0

  override def extractTimestamp(element: Order, previousElementTimestamp: 
Long): Long = {
element.rt
  }

  override def checkAndGetNextWatermark(lastElement: Order, extractedTimestamp: 
Long): Watermark = {
if (currentWatermark >= lastWatermark + interval) {
  currentWatermark = currentWatermark + ((extractedTimestamp - 
lastWatermarks) / interval) * 
interval
  new Watermark(currentWatermark)
} else {
  null
}
  }
}
{code}
BTW, I'm quite interested in this issue. Can I take it?

Thanks, Xingcan



> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7454) update 'Monitoring Current Event Time' section of Flink doc

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144797#comment-16144797
 ] 

ASF GitHub Bot commented on FLINK-7454:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4547
  
@zentol does it look good to you?


> update 'Monitoring Current Event Time' section of Flink doc
> ---
>
> Key: FLINK-7454
> URL: https://issues.apache.org/jira/browse/FLINK-7454
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> Since FLINK-3427 is done, there's no need to have the following doc in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html#monitoring-current-event-time
> "There are plans (see FLINK-3427) to show the current low watermark for each 
> operator in the Flink web interface.
> Until this feature is implemented the current low watermark for each task can 
> be accessed through the metrics system."
> We can replace it with something like "Low watermarks of each task can be 
> accessed either from Flink web interface or Flink metric system."



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144795#comment-16144795
 ] 

ASF GitHub Bot commented on FLINK-7366:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4522
  
can anyone from data artisan take a look at this PR please?


> Upgrade kinesis producer library in flink-connector-kinesis
> ---
>
> Key: FLINK-7366
> URL: https://issues.apache.org/jira/browse/FLINK-7366
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> We need to upgrade KPL and KCL to pick up the enhanced performance and 
> stability for Flink to work better  with Kinesis. Upgrading KPL is specially 
> necessary, because the KPL version Flink uses is old, and doesn't have good 
> retry and error handling logic.
> *KPL:*
> flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which 
> is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus 
> problematic. It doesn't even have good retry logic, therefore Flink fails 
> really frequently (about every 10 mins as we observed) when Flink writes too 
> fast to Kinesis and receives RateLimitExceededException, 
> Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, 
> "*With the newer version of the KPL it uses the AWS C++ SDK which should 
> offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading 
> to, is released in May 2017 and should have the enhanced retry logic.
> *KCL:*
> Upgrade KCL from 1.6.2 to 1.8.1
> *AWS SDK*
> from 1.10.71 to 1.11.171



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144796#comment-16144796
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
can anyone from data artisan take a look at this PR please?


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4547: [FLINK-7454][docs] update 'Monitoring Current Event Time'...

2017-08-28 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4547
  
@zentol does it look good to you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-08-28 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
can anyone from data artisan take a look at this PR please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4522: [FLINK-7366][kinesis connector] Upgrade kinesis producer ...

2017-08-28 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4522
  
can anyone from data artisan take a look at this PR please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144771#comment-16144771
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
All comments addressed.
Do you have time to merge it  @wuchong @fhueske  ?

Thanks.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...

2017-08-28 Thread kaibozhou
Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
All comments addressed.
Do you have time to merge it  @wuchong @fhueske  ?

Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135697894
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

Also the MapView should apply these modification.

Very good suggesttion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144769#comment-16144769
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135697894
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

Also the MapView should apply these modification.

Very good suggesttion.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-28 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144662#comment-16144662
 ] 

Jark Wu commented on FLINK-7446:


[~fhueske] yes, I created FLINK-7548 to discuss this.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7548) Support watermark generation for TableSource

2017-08-28 Thread Jark Wu (JIRA)
Jark Wu created FLINK-7548:
--

 Summary: Support watermark generation for TableSource
 Key: FLINK-7548
 URL: https://issues.apache.org/jira/browse/FLINK-7548
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Jark Wu


As discussed in FLINK-7446, currently the TableSource only support to define 
rowtime field, but not support to extract watermarks from the rowtime field. We 
can provide a new interface called {{DefinedWatermark}}, which has two methods 
{{getRowtimeAttribute}} (can only be an existing field) and 
{{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
deprecated.

How to support periodic and punctuated watermarks and support some built-in 
strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7438) Some classes are eclipsed by classes in package scala

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144622#comment-16144622
 ] 

ASF GitHub Bot commented on FLINK-7438:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4570
  
lgtm


> Some classes are eclipsed by classes in package scala
> -
>
> Key: FLINK-7438
> URL: https://issues.apache.org/jira/browse/FLINK-7438
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, DataStream API
>Reporter: Ted Yu
>Priority: Minor
>
> Noticed the following during compilation:
> {code}
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> object OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> [WARNING]  ^
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> class OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> {code}
> We should avoid the warning e.r.t. OutputTag.
> There may be other occurrences of similar warning.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4570: [FLINK-7438][DataStream API]Remove useless import, avoid ...

2017-08-28 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4570
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails

2017-08-28 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7488:
--
Description: 
{code}
compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.239 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different network buffer memory sizes with 
configuration: {taskmanager.network.memory.fraction=0.1, 
taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, 
taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, 
taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} 
expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf 
because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to 
augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81)

compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.16 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different heap sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting 
HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the 
result of 'hadoop classpath' to augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:275)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:110)
{code}
$HADOOP_CONF_DIR was not set prior to running the test.

  was:
{code}
compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.239 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different network buffer memory sizes with 
configuration: {taskmanager.network.memory.fraction=0.1, 
taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, 
taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, 
taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} 
expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf 
because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to 
augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81)

compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.16 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different h

[jira] [Created] (FLINK-7547) o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable

2017-08-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7547:
-

 Summary: o.a.f.s.api.scala.async.AsyncFunction is not declared 
Serializable
 Key: FLINK-7547
 URL: https://issues.apache.org/jira/browse/FLINK-7547
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


{{org.apache.flink.streaming.api.scala.async.AsyncFunction}} is not declared 
{{Serializable}}, whereas 
{{org.apache.flink.streaming.api.functions.async.AsyncFunction}} is.  This 
leads to the job not starting as the as async function can't be serialized 
during initialization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1610#comment-1610
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r13411
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

We can refactor the `ListView` constructors as follows:

```
class ListView[T] private[flink](
@transient private[flink] val elementTypeInfo: TypeInformation[T],
private[flink] val list: util.List[T])
  extends DataView {

  def this(elementTypeInfo: TypeInformation[T]) {
this(elementTypeInfo, new util.ArrayList[T]())
  }

  def this() = {
this(null, new util.ArrayList[T]())
  }

  ...
}
```

and call the primary constructor in the `ListSerializer` with `null` for 
the type information.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1613#comment-1613
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135550350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

right now an empty `ArrayList` is always created when a `ListView` is 
instantiated. 
This is unnecessary overhead when the `ListView` is copied or deserialized 
using `ListViewSerializer` because the empty instance is immediately replaced.

We should add an option to create a `ListView` without an `ArrayList` 
instance. This means we have to move the creation of the `ArrayList` out of the 
primary constructor.

The same applies to the `MapView`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1618#comment-1618
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135524948
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -42,6 +46,18 @@ class AggregationCodeGenerator(
 input: TypeInformation[_ <: Any])
   extends CodeGenerator(config, nullableInput, input) {
 
+  // set of statements for cleanup dataview that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableCleanupStatements = mutable.LinkedHashSet[String]()
+
+  /**
+* @return code block of statements that need to be placed in the 
cleanup() method of
--- End diff --

`RichFunction` does not have a `cleanup()` method. The `cleanup()` method 
is a method of `GeneratedAggregations`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144439#comment-16144439
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135528903
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig.isDefined) {
+val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.get(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find DataView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

move `serialize` method to this class and rename to 
`serializeStateDescriptor`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1614#comment-1614
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135648347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,92 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* Remove StateView fields from accumulator type information.
+*
+* @param index index of aggregate function
+* @param aggFun aggregate function
+* @param accType accumulator type information, only support pojo type
+* @param isStateBackedDataViews is data views use state backend
+* @return mapping of accumulator type information and data view config 
which contains id,
+* field name and state descriptor
+*/
+  def removeStateViewFieldsFromAccTypeInfo(
+index: Int,
+aggFun: AggregateFunction[_, _],
+accType: TypeInformation[_],
+isStateBackedDataViews: Boolean)
+  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
+
+var hasDataView = false
+val acc = aggFun.createAccumulator()
+accType match {
+  case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
+val arity = pojoType.getArity
+val newPojoFields = new util.ArrayList[PojoField]()
+val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
+for (i <- 0 until arity) {
+  val pojoField = pojoType.getPojoFieldAt(i)
+  val field = pojoField.getField
+  val fieldName = field.getName
+  field.setAccessible(true)
+
+  pojoField.getTypeInformation match {
+case map: MapViewTypeInfo[Any, Any] =>
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
+  hasDataView = true
+  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
+} else {
+  map
+}
+
+var spec = MapViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case list: ListViewTypeInfo[Any] =>
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  if (listView != null) {
+val elementTypeInfo = listView.elementTypeInfo
+val newTypeInfo = if (elementTypeInfo != null) {
+  hasDataView = true
+  new ListViewTypeInfo(elementTypeInfo)
+} else {
+  list
+}
+
+var spec = ListViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case _ => newPojoFields.add(pojoField)
+  }
+}
+(new PojoTypeInfo(accType.getTypeClass, newPojoFields), 
Some(accumulatorSpecs))
+
+  case _ if !hasDataView => (accType, None)
+  case _ => throw new TableException("MapView and ListView only 
support in PoJo class")
--- End diff --

This case will never be reached. `hasDataView` is only set to `true` in the 
`case pojoType: PojoTypeInfo[_]` case. Hence, it will always be false when we 
come to this point.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1615#comment-1615
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135534660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
--- End diff --

Add parentheses to method. Only methods without side-effects should have no 
parentheses. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1617#comment-1617
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135541895
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the list,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+new ListView[T]
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.list = from.list
--- End diff --

We should create a copy of `from.list` using the `ListSerializer`. 
Otherwise we share the instance.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1619#comment-1619
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135539685
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -249,7 +258,8 @@ object AggregateUtil {
   outputArity,
   needRetract,
   needMerge = false,
-  needReset = true
+  needReset = true,
--- End diff --

`needReset` can be `false`.
`resetAccumulator()` is not called by the any of the window operators. Not 
sure why this was `true` before...


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1611#comment-1611
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135528733
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig.isDefined) {
+val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.get(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find DataView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = 
null;
+  """.stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserializeCode =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
--- End diff --

implement deserialization directly in generated code. Moreover, we should 
use the user code classloader for the deserialization which is accessible via 
the `RuntimeContext`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1612#comment-1612
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135561193
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -237,9 +248,13 @@ public void resetAccumulator(CountDistinctAccum acc) {
//Overloaded retract method
public void retract(CountDistinctAccum accumulator, long id) {
try {
-   if 
(!accumulator.map.contains(String.valueOf(id))) {
-   
accumulator.map.remove(String.valueOf(id));
-   accumulator.count -= 1;
+   Integer cnt = 
accumulator.map.get(String.valueOf(id));
+   if (cnt != null) {
+   cnt -= 1;
+   if (cnt <= 0) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
--- End diff --

We should update the count if it is > 0:

```
else { 
  accumulator.map.put(String.valueOf(id), cnt)
}
```


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1616#comment-1616
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135587110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1398,14 +1412,29 @@ object AggregateUtil {
   }
 }
 
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
-  if (null == accTypes(index)) {
+  if (accTypes(index) != null) {
+val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+  agg,
+  accTypes(index),
+  isStateBackedDataViews)
+if (specs.isDefined) {
+  accSpecs(index) = specs.get
+  accTypes(index) = accType
+} else {
+  accSpecs(index) = Seq()
+  accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg)
--- End diff --

No need to override `accTypes(index)`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135541895
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the list,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+new ListView[T]
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.list = from.list
--- End diff --

We should create a copy of `from.list` using the `ListSerializer`. 
Otherwise we share the instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135534660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
--- End diff --

Add parentheses to method. Only methods without side-effects should have no 
parentheses. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135550350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

right now an empty `ArrayList` is always created when a `ListView` is 
instantiated. 
This is unnecessary overhead when the `ListView` is copied or deserialized 
using `ListViewSerializer` because the empty instance is immediately replaced.

We should add an option to create a `ListView` without an `ArrayList` 
instance. This means we have to move the creation of the `ArrayList` out of the 
primary constructor.

The same applies to the `MapView`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135528903
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig.isDefined) {
+val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.get(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find DataView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

move `serialize` method to this class and rename to 
`serializeStateDescriptor`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r13411
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

We can refactor the `ListView` constructors as follows:

```
class ListView[T] private[flink](
@transient private[flink] val elementTypeInfo: TypeInformation[T],
private[flink] val list: util.List[T])
  extends DataView {

  def this(elementTypeInfo: TypeInformation[T]) {
this(elementTypeInfo, new util.ArrayList[T]())
  }

  def this() = {
this(null, new util.ArrayList[T]())
  }

  ...
}
```

and call the primary constructor in the `ListSerializer` with `null` for 
the type information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135539685
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -249,7 +258,8 @@ object AggregateUtil {
   outputArity,
   needRetract,
   needMerge = false,
-  needReset = true
+  needReset = true,
--- End diff --

`needReset` can be `false`.
`resetAccumulator()` is not called by the any of the window operators. Not 
sure why this was `true` before...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135528733
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig.isDefined) {
+val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.get(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find DataView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = 
null;
+  """.stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserializeCode =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
--- End diff --

implement deserialization directly in generated code. Moreover, we should 
use the user code classloader for the deserialization which is accessible via 
the `RuntimeContext`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135648347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,92 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* Remove StateView fields from accumulator type information.
+*
+* @param index index of aggregate function
+* @param aggFun aggregate function
+* @param accType accumulator type information, only support pojo type
+* @param isStateBackedDataViews is data views use state backend
+* @return mapping of accumulator type information and data view config 
which contains id,
+* field name and state descriptor
+*/
+  def removeStateViewFieldsFromAccTypeInfo(
+index: Int,
+aggFun: AggregateFunction[_, _],
+accType: TypeInformation[_],
+isStateBackedDataViews: Boolean)
+  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
+
+var hasDataView = false
+val acc = aggFun.createAccumulator()
+accType match {
+  case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
+val arity = pojoType.getArity
+val newPojoFields = new util.ArrayList[PojoField]()
+val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
+for (i <- 0 until arity) {
+  val pojoField = pojoType.getPojoFieldAt(i)
+  val field = pojoField.getField
+  val fieldName = field.getName
+  field.setAccessible(true)
+
+  pojoField.getTypeInformation match {
+case map: MapViewTypeInfo[Any, Any] =>
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
+  hasDataView = true
+  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
+} else {
+  map
+}
+
+var spec = MapViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case list: ListViewTypeInfo[Any] =>
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  if (listView != null) {
+val elementTypeInfo = listView.elementTypeInfo
+val newTypeInfo = if (elementTypeInfo != null) {
+  hasDataView = true
+  new ListViewTypeInfo(elementTypeInfo)
+} else {
+  list
+}
+
+var spec = ListViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case _ => newPojoFields.add(pojoField)
+  }
+}
+(new PojoTypeInfo(accType.getTypeClass, newPojoFields), 
Some(accumulatorSpecs))
+
+  case _ if !hasDataView => (accType, None)
+  case _ => throw new TableException("MapView and ListView only 
support in PoJo class")
--- End diff --

This case will never be reached. `hasDataView` is only set to `true` in the 
`case pojoType: PojoTypeInfo[_]` case. Hence, it will always be false when we 
come to this point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135587110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1398,14 +1412,29 @@ object AggregateUtil {
   }
 }
 
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
-  if (null == accTypes(index)) {
+  if (accTypes(index) != null) {
+val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+  agg,
+  accTypes(index),
+  isStateBackedDataViews)
+if (specs.isDefined) {
+  accSpecs(index) = specs.get
+  accTypes(index) = accType
+} else {
+  accSpecs(index) = Seq()
+  accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg)
--- End diff --

No need to override `accTypes(index)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135561193
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -237,9 +248,13 @@ public void resetAccumulator(CountDistinctAccum acc) {
//Overloaded retract method
public void retract(CountDistinctAccum accumulator, long id) {
try {
-   if 
(!accumulator.map.contains(String.valueOf(id))) {
-   
accumulator.map.remove(String.valueOf(id));
-   accumulator.count -= 1;
+   Integer cnt = 
accumulator.map.get(String.valueOf(id));
+   if (cnt != null) {
+   cnt -= 1;
+   if (cnt <= 0) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
--- End diff --

We should update the count if it is > 0:

```
else { 
  accumulator.map.put(String.valueOf(id), cnt)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135524948
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -42,6 +46,18 @@ class AggregationCodeGenerator(
 input: TypeInformation[_ <: Any])
   extends CodeGenerator(config, nullableInput, input) {
 
+  // set of statements for cleanup dataview that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableCleanupStatements = mutable.LinkedHashSet[String]()
+
+  /**
+* @return code block of statements that need to be placed in the 
cleanup() method of
--- End diff --

`RichFunction` does not have a `cleanup()` method. The `cleanup()` method 
is a method of `GeneratedAggregations`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-08-28 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144282#comment-16144282
 ] 

Shuyi Chen commented on FLINK-7491:
---

Hi [~jark], thanks for the response. However, I am worried with Array as the 
runtime type, multiset specific operation will be slow, for example.

MEMBER OF operator is O(1) for multiset data structure and O(n) for array.
SUBMULTISET OF operator is O(m+n) for array, and O(m) for multiset if to test M 
< N.

Also the actual type I am using is HashMultiset, which is backed by a java 
HashMap, which I think should perform reasonably well.

> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7546) Support SUBMULTISET_OF Operator for Multiset SQL type

2017-08-28 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-7546:
-

 Summary: Support SUBMULTISET_OF Operator for Multiset SQL type
 Key: FLINK-7546
 URL: https://issues.apache.org/jira/browse/FLINK-7546
 Project: Flink
  Issue Type: New Feature
Reporter: Shuyi Chen
Assignee: Shuyi Chen






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7545) Support MEMBER OF Operator for Multiset SQL type

2017-08-28 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-7545:
-

 Summary: Support MEMBER OF Operator for Multiset SQL type
 Key: FLINK-7545
 URL: https://issues.apache.org/jira/browse/FLINK-7545
 Project: Flink
  Issue Type: New Feature
Reporter: Shuyi Chen
Assignee: Shuyi Chen






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-08-28 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144102#comment-16144102
 ] 

Xingcan Cui commented on FLINK-6233:


Hi [~fhueske], the [document|https://goo.gl/VW5Gpd] has been roughly finished. 
I wish it could help. :)

> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime < s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143938#comment-16143938
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4612
  
CC @alpinegizmo 


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4612: [FLINK-7452] [types] Add helper methods for all built-in ...

2017-08-28 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4612
  
CC @alpinegizmo 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-08-28 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/4612

[FLINK-7452] [types] Add helper methods for all built-in Flink types

## What is the purpose of the change

This PR provides an unified, detailed, and easy to use utility to work with 
Flink's built-in Java types.

This class helps users to get an overview about built-in Flink types and 
their features/limitations (esp. regarding null support).

## Brief change log

Improved `org.apache.flink.api.common.typeinfo.Types`.


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-7452

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4612


commit 2e027ed3c4671363f7023dd9faf7ba0d0c827312
Author: twalthr 
Date:   2017-08-28T12:13:07Z

[FLINK-7452] [types] Add helper methods for all built-in Flink types to 
Types




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143937#comment-16143937
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/4612

[FLINK-7452] [types] Add helper methods for all built-in Flink types

## What is the purpose of the change

This PR provides an unified, detailed, and easy to use utility to work with 
Flink's built-in Java types.

This class helps users to get an overview about built-in Flink types and 
their features/limitations (esp. regarding null support).

## Brief change log

Improved `org.apache.flink.api.common.typeinfo.Types`.


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-7452

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4612


commit 2e027ed3c4671363f7023dd9faf7ba0d0c827312
Author: twalthr 
Date:   2017-08-28T12:13:07Z

[FLINK-7452] [types] Add helper methods for all built-in Flink types to 
Types




> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7543) Simplify REST parameter access.

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143915#comment-16143915
 ] 

ASF GitHub Bot commented on FLINK-7543:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4611
  
Changes LGTM @zentol ! 

When Travis gives a green light, feel free to merge!


> Simplify REST parameter access.
> ---
>
> Key: FLINK-7543
> URL: https://issues.apache.org/jira/browse/FLINK-7543
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> Currently you have to  do: 
> {{
> final ParameterTypes.JobIdPathParam jobId = 
> request.getPathParameter(ParameterTypes.JobIdPathParam.class);
>   JobID jobID = jobId.getValue();
> }}
> This issue proposes to remove the second step and return directly the value, 
> while performing the necessary checks internally (different for query and 
> path parameters), without exposing it to the user.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4611: [FLINK-7543] [REST] Simplify handler access to path/query...

2017-08-28 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4611
  
Changes LGTM @zentol ! 

When Travis gives a green light, feel free to merge!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7310) always use HybridMemorySegment

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143910#comment-16143910
 ] 

ASF GitHub Bot commented on FLINK-7310:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4445
  
Thanks!

I am currently trying to pinpoint what part of the code exactly suffers 
most from the regression. If that is for example specific to the 
microbenchmark, we can merge this without concern...


> always use HybridMemorySegment
> --
>
> Key: FLINK-7310
> URL: https://issues.apache.org/jira/browse/FLINK-7310
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> For future changes to the network buffers (sending our own off-heap buffers 
> through to netty), we cannot use {{HeapMemorySegment}} anymore and need to 
> rely on {{HybridMemorySegment}} instead.
> We should thus drop any code that loads the {{HeapMemorySegment}} (it is 
> still available if needed) in favour of the {{HybridMemorySegment}} which is 
> able to work on both heap and off-heap memory.
> FYI: For the performance penalty of this change compared to using 
> {{HeapMemorySegment}} alone, see this interesting blob article (from 2015):
> https://flink.apache.org/news/2015/09/16/off-heap-memory.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4445: [FLINK-7310][core] always use the HybridMemorySegment

2017-08-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4445
  
Thanks!

I am currently trying to pinpoint what part of the code exactly suffers 
most from the regression. If that is for example specific to the 
microbenchmark, we can merge this without concern...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7543) Simplify REST parameter access.

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143904#comment-16143904
 ] 

ASF GitHub Bot commented on FLINK-7543:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4611

[FLINK-7543] [REST] Simplify handler access to path/query parameters

## What is the purpose of the change

This PR simplifies the access to path/query parameters by directly 
returning the value contained in the parameter instead of the parameter itself.


## Brief change log

* make all path parameters mandatory
* simplify access in `HandlerRequest`
* simplify test code in `RestEndpointITCase`


## Verifying this change

This change is already covered by existing tests, such as 
`RestEndpointITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 7544

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4611


commit dcce0b7631bf65ea66dbe0d64b368c7143815f9e
Author: zentol 
Date:   2017-08-28T14:46:04Z

[FLINK-7544] [REST] Make all path parameters mandatory

commit 42486981c4310076a99a3cb6983131c2ae14725a
Author: zentol 
Date:   2017-08-28T15:26:01Z

[FLINK-7543] [REST] Simplify handler access to path/query parameters




> Simplify REST parameter access.
> ---
>
> Key: FLINK-7543
> URL: https://issues.apache.org/jira/browse/FLINK-7543
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> Currently you have to  do: 
> {{
> final ParameterTypes.JobIdPathParam jobId = 
> request.getPathParameter(ParameterTypes.JobIdPathParam.class);
>   JobID jobID = jobId.getValue();
> }}
> This issue proposes to remove the second step and return directly the value, 
> while performing the necessary checks internally (different for query and 
> path parameters), without exposing it to the user.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4611: [FLINK-7543] [REST] Simplify handler access to pat...

2017-08-28 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4611

[FLINK-7543] [REST] Simplify handler access to path/query parameters

## What is the purpose of the change

This PR simplifies the access to path/query parameters by directly 
returning the value contained in the parameter instead of the parameter itself.


## Brief change log

* make all path parameters mandatory
* simplify access in `HandlerRequest`
* simplify test code in `RestEndpointITCase`


## Verifying this change

This change is already covered by existing tests, such as 
`RestEndpointITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 7544

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4611


commit dcce0b7631bf65ea66dbe0d64b368c7143815f9e
Author: zentol 
Date:   2017-08-28T14:46:04Z

[FLINK-7544] [REST] Make all path parameters mandatory

commit 42486981c4310076a99a3cb6983131c2ae14725a
Author: zentol 
Date:   2017-08-28T15:26:01Z

[FLINK-7543] [REST] Simplify handler access to path/query parameters




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143881#comment-16143881
 ] 

ASF GitHub Bot commented on FLINK-6306:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4607
  
We cannot restart Travis ourselves. Only the contributor can schedule 
another run by adding another commit (even an empty one). However, please don't 
do that for the sake of getting a picture-perfect build; we are aware of some 
unstable tests and account for that in the review.


> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
> Attachments: eventually-consistent-sink
>
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...

2017-08-28 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4607
  
We cannot restart Travis ourselves. Only the contributor can schedule 
another run by adding another commit (even an empty one). However, please don't 
do that for the sake of getting a picture-perfect build; we are aware of some 
unstable tests and account for that in the review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


issues@flink.apache.org

2017-08-28 Thread Jacob Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143844#comment-16143844
 ] 

Jacob Park commented on FLINK-7465:
---

[~fhueske] [~sunjincheng121] Thanks for the context. :)

If HyperLogLogs are out, then how about Cuckoo Filters? They are similar to 
Bloom Filters, but they are designed differently as inspired by cuckoo hashing, 
supports deletion, and takes approximately the same space. 
https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf See 
https://bdupras.github.io/filter-tutorial/ for an interactive summary. You can 
also estimate a count with Cuckoo Filters unlike standard Bloom Filters.

{noformat}
...for reasonably large sized sets, for the same false positive rate as a 
corresponding Bloom filter, cuckoo filters use less space than Bloom filters, 
are faster on lookups (but slower on insertions/to construct), and amazingly 
also allow deletions of keys (which Bloom filters cannot do). -Michael 
Mitzenmacher (2014)
{noformat}


> Add build-in BloomFilterCount on TableAPI&SQL
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7544) Make all PathParameters mandatory

2017-08-28 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-7544:
---

 Summary: Make all PathParameters mandatory
 Key: FLINK-7544
 URL: https://issues.apache.org/jira/browse/FLINK-7544
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.4.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.4.0


In the current REST architecture all path parameters are mandatory, so we 
should mark them as such in {{MessagePathParameter}}.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7543) Simplify REST parameter access.

2017-08-28 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-7543:
--
Description: 
Currently you have to  do: 
{{
final ParameterTypes.JobIdPathParam jobId = 
request.getPathParameter(ParameterTypes.JobIdPathParam.class);
JobID jobID = jobId.getValue();
}}

This issue proposes to remove the second step and return directly the value, 
while performing the necessary checks internally (different for query and path 
parameters), without exposing it to the user.

  was:
Currently you have to  do: 
{
final ParameterTypes.JobIdPathParam jobId = 
request.getPathParameter(ParameterTypes.JobIdPathParam.class);
JobID jobID = jobId.getValue();
}

This issue proposes to remove the second step and return directly the value, 
while performing the necessary checks internally (different for query and path 
parameters), without exposing it to the user.


> Simplify REST parameter access.
> ---
>
> Key: FLINK-7543
> URL: https://issues.apache.org/jira/browse/FLINK-7543
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> Currently you have to  do: 
> {{
> final ParameterTypes.JobIdPathParam jobId = 
> request.getPathParameter(ParameterTypes.JobIdPathParam.class);
>   JobID jobID = jobId.getValue();
> }}
> This issue proposes to remove the second step and return directly the value, 
> while performing the necessary checks internally (different for query and 
> path parameters), without exposing it to the user.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7543) Simplify REST parameter access.

2017-08-28 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-7543:
--
Description: 
Currently you have to  do: 
{
final ParameterTypes.JobIdPathParam jobId = 
request.getPathParameter(ParameterTypes.JobIdPathParam.class);
JobID jobID = jobId.getValue();
}

This issue proposes to remove the second step and return directly the value, 
while performing the necessary checks internally (different for query and path 
parameters), without exposing it to the user.

  was:
Currently you have to  do: 
{{
final ParameterTypes.JobIdPathParam jobId = 
request.getPathParameter(ParameterTypes.JobIdPathParam.class);
JobID jobID = jobId.getValue();
}}

This issue proposes to remove the second step and return directly the value, 
while performing the necessary checks internally (different for query and path 
parameters), without exposing it to the user.


> Simplify REST parameter access.
> ---
>
> Key: FLINK-7543
> URL: https://issues.apache.org/jira/browse/FLINK-7543
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> Currently you have to  do: 
> {
> final ParameterTypes.JobIdPathParam jobId = 
> request.getPathParameter(ParameterTypes.JobIdPathParam.class);
>   JobID jobID = jobId.getValue();
> }
> This issue proposes to remove the second step and return directly the value, 
> while performing the necessary checks internally (different for query and 
> path parameters), without exposing it to the user.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7543) Simplify REST parameter access.

2017-08-28 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-7543:
-

 Summary: Simplify REST parameter access.
 Key: FLINK-7543
 URL: https://issues.apache.org/jira/browse/FLINK-7543
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.4.0
Reporter: Kostas Kloudas
Assignee: Chesnay Schepler
 Fix For: 1.4.0


Currently you have to  do: 
{{{
final ParameterTypes.JobIdPathParam jobId = 
request.getPathParameter(ParameterTypes.JobIdPathParam.class);
JobID jobID = jobId.getValue();
}}}

This issue proposes to remove the second step and return directly the value, 
while performing the necessary checks internally (different for query and path 
parameters), without exposing it to the user.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7543) Simplify REST parameter access.

2017-08-28 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-7543:
--
Description: 
Currently you have to  do: 
{{
final ParameterTypes.JobIdPathParam jobId = 
request.getPathParameter(ParameterTypes.JobIdPathParam.class);
JobID jobID = jobId.getValue();
}}

This issue proposes to remove the second step and return directly the value, 
while performing the necessary checks internally (different for query and path 
parameters), without exposing it to the user.

  was:
Currently you have to  do: 
{{{
final ParameterTypes.JobIdPathParam jobId = 
request.getPathParameter(ParameterTypes.JobIdPathParam.class);
JobID jobID = jobId.getValue();
}}}

This issue proposes to remove the second step and return directly the value, 
while performing the necessary checks internally (different for query and path 
parameters), without exposing it to the user.


> Simplify REST parameter access.
> ---
>
> Key: FLINK-7543
> URL: https://issues.apache.org/jira/browse/FLINK-7543
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> Currently you have to  do: 
> {{
> final ParameterTypes.JobIdPathParam jobId = 
> request.getPathParameter(ParameterTypes.JobIdPathParam.class);
>   JobID jobID = jobId.getValue();
> }}
> This issue proposes to remove the second step and return directly the value, 
> while performing the necessary checks internally (different for query and 
> path parameters), without exposing it to the user.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4609: Assigner

2017-08-28 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4609
  
@StefanRRichter please take a look


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143814#comment-16143814
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
Yes, this way also has some advantages, and recycling these exclusive 
buffers would be covered in next PR with some additional tests. 

I will consider your suggestions to supplement some tests in this PR and 
submit the modifications based on all the above comments.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-28 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
Yes, this way also has some advantages, and recycling these exclusive 
buffers would be covered in next PR with some additional tests. 

I will consider your suggestions to supplement some tests in this PR and 
submit the modifications based on all the above comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7542) Some tests in AggregateITCase fail for some Time Zones

2017-08-28 Thread Usman Younas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Usman Younas updated FLINK-7542:

Affects Version/s: 1.3.2

> Some tests in AggregateITCase fail for some Time Zones
> --
>
> Key: FLINK-7542
> URL: https://issues.apache.org/jira/browse/FLINK-7542
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Usman Younas
>
> In {{org.apache.flink.table.runtime.batch.sql.AggregateITCase}} two tests 
> 1. testTumbleWindowAggregate and 
> 2. testHopWindowAggregate 
> are failing for some time zones.
> Bug can be produced by changing the time zone of machine to
> Time Zone: Central Daylight Time
> Closest City: Houston-United States
> I think, problem is with Timestamp.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6860) update Apache Beam in page Ecosystem

2017-08-28 Thread Hai Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou reassigned FLINK-6860:
---

Assignee: Hai Zhou

> update Apache Beam in page Ecosystem
> 
>
> Key: FLINK-6860
> URL: https://issues.apache.org/jira/browse/FLINK-6860
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Reporter: Xu Mingmin
>Assignee: Hai Zhou
>Priority: Minor
>
> To remove the word {{incubating}} and update the link, --Apache Beam has 
> graduated as a top-level project.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143792#comment-16143792
 ] 

ASF GitHub Bot commented on FLINK-6306:
---

Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/4607
  
Would you be able to rerun travis, the test failed on a single 
configuration during the Kafka09ITTest due to a task manager failure. I do not 
believe any of my code changes touched any of the code paths in that test. 


> Sink for eventually consistent file systems
> ---
>
> Key: FLINK-6306
> URL: https://issues.apache.org/jira/browse/FLINK-6306
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
> Attachments: eventually-consistent-sink
>
>
> Currently Flink provides the BucketingSink as an exactly once method for 
> writing out to a file system. It provides these guarantees by moving files 
> through several stages and deleting or truncating files that get into a bad 
> state. While this is a powerful abstraction, it causes issues with eventually 
> consistent file systems such as Amazon's S3 where most operations (ie rename, 
> delete, truncate) are not guaranteed to become consistent within a reasonable 
> amount of time. Flink should provide a sink that provides exactly once writes 
> to a file system where only PUT operations are considered consistent. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7542) Some tests in AggregateITCase fail for some Time Zones

2017-08-28 Thread Usman Younas (JIRA)
Usman Younas created FLINK-7542:
---

 Summary: Some tests in AggregateITCase fail for some Time Zones
 Key: FLINK-7542
 URL: https://issues.apache.org/jira/browse/FLINK-7542
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Usman Younas


In {{org.apache.flink.table.runtime.batch.sql.AggregateITCase}} two tests 
1. testTumbleWindowAggregate and 
2. testHopWindowAggregate 
are failing for some time zones.

Bug can be produced by changing the time zone of machine to
Time Zone: Central Daylight Time
Closest City: Houston-United States

I think, problem is with Timestamp.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...

2017-08-28 Thread sjwiesman
Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/4607
  
Would you be able to rerun travis, the test failed on a single 
configuration during the Kafka09ITTest due to a task manager failure. I do not 
believe any of my code changes touched any of the code paths in that test. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7130) Remove eventSerializer from NFA

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143787#comment-16143787
 ] 

ASF GitHub Bot commented on FLINK-7130:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/4610

[FLINK-7130] Removed event serializer from NFA and SharedBuffer

## What is the purpose of the change

* The purpose is to remove usage of event serializer from `NFA` and 
`SharedBuffer` classes, as it should not be used in the logic part. 


## Brief change log

*(for example:)*
  - Removed NonDuplicatingTypeSerializer
  - Removed eventSerializer from `NFA`
  - Removed eventSerializer from `SharedBuffer`


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no )
  - The runtime per-record code paths (performance sensitive): ( no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink cep-remove-event-serializer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4610.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4610


commit 1f61b2f0858506a9ff7bde47a33c8ab2b6ddbb50
Author: mtunique 
Date:   2017-04-13T12:22:41Z

[FLINK-7130] Removed event serializer from NFA and SharedBuffer




> Remove eventSerializer from NFA
> ---
>
> Key: FLINK-7130
> URL: https://issues.apache.org/jira/browse/FLINK-7130
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> Right now eventSerializer is serialized within NFA. It should be present only 
> in NFASerializer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4610: [FLINK-7130] Removed event serializer from NFA and...

2017-08-28 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/4610

[FLINK-7130] Removed event serializer from NFA and SharedBuffer

## What is the purpose of the change

* The purpose is to remove usage of event serializer from `NFA` and 
`SharedBuffer` classes, as it should not be used in the logic part. 


## Brief change log

*(for example:)*
  - Removed NonDuplicatingTypeSerializer
  - Removed eventSerializer from `NFA`
  - Removed eventSerializer from `SharedBuffer`


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no )
  - The runtime per-record code paths (performance sensitive): ( no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink cep-remove-event-serializer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4610.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4610


commit 1f61b2f0858506a9ff7bde47a33c8ab2b6ddbb50
Author: mtunique 
Date:   2017-04-13T12:22:41Z

[FLINK-7130] Removed event serializer from NFA and SharedBuffer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4609: Assigner

2017-08-28 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4609

Assigner

This PR is a pure refactor and shouldn't change any functionality. It 
should be covered by existing tests like `CheckpointCoordinatorTest`



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink assigner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4609.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4609


commit 06c1367c0a9a05c13bc5abf4bae96241350a276e
Author: Piotr Nowojski 
Date:   2017-08-25T11:59:36Z

[hotfix][runtime] Checkstyle changes in TaskStateSnapshot

commit efc46f7cc1508499e421946767b326ba91118c38
Author: Piotr Nowojski 
Date:   2017-08-25T13:23:15Z

[FLINK-7541][runtime] Refactor StateAssignmentOperation and use OperatorID

This is not complete refactor, some methods still relay on the order of the
new and old operators.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7541) Redistribute operator state using OperatorID

2017-08-28 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7541:
-

 Summary: Redistribute operator state using OperatorID
 Key: FLINK-7541
 URL: https://issues.apache.org/jira/browse/FLINK-7541
 Project: Flink
  Issue Type: Improvement
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


Currently StateAssignmentOperation relays heavily on the order of new and old 
operators in the task. It should be changed and it should relay more on 
OperatorID.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143722#comment-16143722
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4355
  
Hi @wuchong, 

I think we don't need to call `open()` and `close()` in 
`AggregateAggFunction`. `GeneratedAggregations` is an internal class which is 
not exposed to users. It would be a bug in the translation logic if a 
`GeneratedAggregations` which requires `open()` or `close()` would be passed to 
a `AggregateAggFunction`. A user couldn't do anything to prevent this.

+1 for refactoring `AggregateCodeGenerator`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...

2017-08-28 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4355
  
Hi @wuchong, 

I think we don't need to call `open()` and `close()` in 
`AggregateAggFunction`. `GeneratedAggregations` is an internal class which is 
not exposed to users. It would be a bug in the translation logic if a 
`GeneratedAggregations` which requires `open()` or `close()` would be passed to 
a `AggregateAggFunction`. A user couldn't do anything to prevent this.

+1 for refactoring `AggregateCodeGenerator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7227) OR expression with more than 2 predicates is not pushed into a TableSource

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143686#comment-16143686
 ] 

ASF GitHub Bot commented on FLINK-7227:
---

GitHub user uybhatti opened a pull request:

https://github.com/apache/flink/pull/4608

[FLINK-7227][Table API & SQL]Fix the the TableSource predicate push…

… down issue for OR and AND expression with more than 2 predicates


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature 

[GitHub] flink pull request #4608: [FLINK-7227][Table API & SQL]Fix the the TableSour...

2017-08-28 Thread uybhatti
GitHub user uybhatti opened a pull request:

https://github.com/apache/flink/pull/4608

[FLINK-7227][Table API & SQL]Fix the the TableSource predicate push…

… down issue for OR and AND expression with more than 2 predicates


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uybhatti/flink FLINK-7227

Alternatively you can review and apply these changes

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135504282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Very good point, you are right! 
We need to generate the state descriptors here, serialize them and ship 
them.

Thanks for the clarification.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143682#comment-16143682
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135504282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Very good point, you are right! 
We need to generate the state descriptors here, serialize them and ship 
them.

Thanks for the clarification.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135503957
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

Sorry, I overlooked the `close()` calls. If the method is used, we should 
keep it of course.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143681#comment-16143681
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135503957
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

Sorry, I overlooked the `close()` calls. If the method is used, we should 
keep it of course.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7525) Add config option to disable Cancel functionality on UI

2017-08-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7525:

Component/s: Webfrontend
 Web Client

> Add config option to disable Cancel functionality on UI
> ---
>
> Key: FLINK-7525
> URL: https://issues.apache.org/jira/browse/FLINK-7525
> Project: Flink
>  Issue Type: Improvement
>  Components: Web Client, Webfrontend
>Reporter: Ted Yu
>
> In this email thread 
> http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI
>  , Raja was asking for a way to control how users cancel Job(s).
> Robert proposed adding a config option which disables the Cancel 
> functionality.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


issues@flink.apache.org

2017-08-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143673#comment-16143673
 ] 

Fabian Hueske commented on FLINK-7465:
--

HyperLogLog does not support retraction, i.e., removal for formerly added 
values. 

Bloom filters could be modified to support retraction by using int or long 
arrays instead of bit arrays. However, this would increase the space 
requirements by 32x or 64x.

I think it's fine to use HyperLogLog and ignore the retraction case. At the 
moment, retraction is only mandatory for OVER windows and FLINK-7471 proposes 
to support non-retractable aggregation functions in OVER windows. So, this 
limitation might be resolved in the future.

> Add build-in BloomFilterCount on TableAPI&SQL
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


issues@flink.apache.org

2017-08-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137425#comment-16137425
 ] 

Fabian Hueske edited comment on FLINK-7465 at 8/28/17 10:20 AM:


I'm sorry, I confused count-min sketches (for approximate group counts) and 
HyperLogLog (for approximate distinct counts). 

I assume the goal of the BloomFilterCount function is to (approximately) count 
the number of distinct values. In contrast to HyperLogLog, Bloom filters are 
not specifically designed for approximate distinct counting but for approximate 
membership testing. AFAIK, bloom filters should be more precise for log 
distinct cardinalities but HyperLogLog should provide much better results for 
larger cardinalities.

IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty 
nice. OTOH, multiple RocksDB point lookups might also be more expensive than a 
single lookup with larger serialization payload (the deserialization logic for 
byte arrays shouldn't be very costy).


was (Author: fhueske):
I'm sorry, I confused count-min sketches (for approximate group counts) and 
HyperLogLog (for approximate distinct counts). 

I assume the goal of the BloomFilterCount function is to (approximately) count 
the number of distinct values. In contrast to HyperLogLog, Bloom filters are 
not specifically designed for approximate distinct counting but for approximate 
membership testing. AFAIK, bloom filters should be more precise for log 
distinct cardinalities but HyperLogLog should provide much better results for 
larger cardinalities.

IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty 
nice. OTOH, point multiple RocksDB lookups might also be more expensive than a 
single lookup with larger serialization payload (the deserialization logic for 
byte arrays shouldn't be very costy).

> Add build-in BloomFilterCount on TableAPI&SQL
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


issues@flink.apache.org

2017-08-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137425#comment-16137425
 ] 

Fabian Hueske edited comment on FLINK-7465 at 8/28/17 10:20 AM:


I'm sorry, I confused count-min sketches (for approximate group counts) and 
HyperLogLog (for approximate distinct counts). 

I assume the goal of the BloomFilterCount function is to (approximately) count 
the number of distinct values. In contrast to HyperLogLog, Bloom filters are 
not specifically designed for approximate distinct counting but for approximate 
membership testing. AFAIK, bloom filters should be more precise for log 
distinct cardinalities but HyperLogLog should provide much better results for 
larger cardinalities.

IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty 
nice. OTOH, multiple RocksDB point lookups might also be more expensive than a 
single lookup with larger serialization payload (the deserialization logic for 
byte arrays shouldn't be very costly).


was (Author: fhueske):
I'm sorry, I confused count-min sketches (for approximate group counts) and 
HyperLogLog (for approximate distinct counts). 

I assume the goal of the BloomFilterCount function is to (approximately) count 
the number of distinct values. In contrast to HyperLogLog, Bloom filters are 
not specifically designed for approximate distinct counting but for approximate 
membership testing. AFAIK, bloom filters should be more precise for log 
distinct cardinalities but HyperLogLog should provide much better results for 
larger cardinalities.

IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty 
nice. OTOH, multiple RocksDB point lookups might also be more expensive than a 
single lookup with larger serialization payload (the deserialization logic for 
byte arrays shouldn't be very costy).

> Add build-in BloomFilterCount on TableAPI&SQL
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7540) submit a job on yarn-cluster mode or start a yarn-session failed,in hadoop cluster with capitalized hostname

2017-08-28 Thread Tong Yan Ou (JIRA)
Tong Yan Ou created FLINK-7540:
--

 Summary: submit a job on yarn-cluster mode or start a yarn-session 
failed,in hadoop cluster with capitalized hostname
 Key: FLINK-7540
 URL: https://issues.apache.org/jira/browse/FLINK-7540
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.3.2, 1.3.1, 1.4.0
Reporter: Tong Yan Ou
 Fix For: 1.3.3


Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
DSJ-signal-900G-71”
When using the following command:
./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
~/flink-1.3.1/examples/batch/WordCount.jar --input 
/user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result  
Or
./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
"flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
There will be some exceptions at Command line interface:

java.lang.RuntimeException: Unable to get ClusterClient status from Application 
Client
at 
org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
…
Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
leading JobManager. Please check that the JobManager is running.

h4. Then the job fails , starting the yarn-session is the same.

The exceptions of the application log:
2017-08-10 17:36:10,334 WARN  
org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
retrieve leader gateway and port.
akka.actor.ActorNotFound: Actor not found for: 
ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
Path(/user/jobmanager)]
…
2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager
- Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
Path(/user/jobmanager)]] after [1 ms]


And I found some differences in actor System:
2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager  
- Starting JobManager at 
akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager  
- JobManager 
akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted leadership 
with leader session ID Some(----).
2017-08-10 17:36:00,312 INFO  
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
listening at 0:0:0:0:0:0:0:0:54921
2017-08-10 17:36:00,312 INFO  
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
54921
2017-08-10 17:36:00,313 INFO  
org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
reachable under 
akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.


The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
The hostname of JobManagerRetriever’s actor is lowercase.


And I read source code,
Class NetUtils the unresolvedHostToNormalizedString(String host) method of line 
127:
public static String unresolvedHostToNormalizedString(String host) {

// Return loopback interface address if host is null
// This represents the behavior of {@code InetAddress.getByName } and RFC 3330  
if (host == null) { 
   host = InetAddress.getLoopbackAddress().getHostAddress();
} else {host = host.trim().toLowerCase();   
}
...
}


It turns the host name into lowercase.
Therefore, JobManagerRetriever certainly can not find Jobmanager's actorSYstem.
Then I removed the call to the toLowerCase() method in the source code.

Finally ,I can submit a job in yarn-cluster mode and start a yarn-session.






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143560#comment-16143560
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4485
  
Hi @zhijiangW,
regarding the buffer pool implementation, I was just curious about why it 
was done that way. But it is fine to keep the logic in `RemoteInputChannel` if 
you make sure, that a recycler puts these buffers right (back) into the buffer 
queue (I guess, that's in one of the follow-up PRs). This way, we avoid an 
additional intermediate component (and the need to interact with it). To 
conclude, on a second thought, it is fine as it is.

The thing with `ResultPartitionType` is that without an (intermediate) way 
to set `isCreditBased` to `true`, we are not really able to test this code path 
on higher levels such as the `NetworkEnvironment` (or maybe I'll see that in 
the follow-up PRs as well).

Speaking of tests...I understand that with the switch to credit-based flow 
control, some parts will be covered by existing tests, but we also change the 
behaviour at some points and the current tests are already a bit sparse. Can 
you also add tests for
- the `NetworkEnvironment` changes (into `NetworkEnvironmentTest`),
- `NetworkBufferPool#requestMemorySegments`, 
`NetworkBufferPool#recycleMemorySegments` (into `NetworkBufferPoolTest` which 
currently is a bit sparse though)
- the changes in `SingleInputGate` (into `SingleInputGateTest`)


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-08-28 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4485
  
Hi @zhijiangW,
regarding the buffer pool implementation, I was just curious about why it 
was done that way. But it is fine to keep the logic in `RemoteInputChannel` if 
you make sure, that a recycler puts these buffers right (back) into the buffer 
queue (I guess, that's in one of the follow-up PRs). This way, we avoid an 
additional intermediate component (and the need to interact with it). To 
conclude, on a second thought, it is fine as it is.

The thing with `ResultPartitionType` is that without an (intermediate) way 
to set `isCreditBased` to `true`, we are not really able to test this code path 
on higher levels such as the `NetworkEnvironment` (or maybe I'll see that in 
the follow-up PRs as well).

Speaking of tests...I understand that with the switch to credit-based flow 
control, some parts will be covered by existing tests, but we also change the 
behaviour at some points and the current tests are already a bit sparse. Can 
you also add tests for
- the `NetworkEnvironment` changes (into `NetworkEnvironmentTest`),
- `NetworkBufferPool#requestMemorySegments`, 
`NetworkBufferPool#recycleMemorySegments` (into `NetworkBufferPoolTest` which 
currently is a bit sparse though)
- the changes in `SingleInputGate` (into `SingleInputGateTest`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143556#comment-16143556
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r135481583
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() {
 
public void setBufferPool(BufferPool bufferPool) {
// Sanity checks
-   checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
+   if (!getConsumedPartitionType().isCreditBased()) {
+   checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
"Bug in input gate setup logic: buffer pool has 
not enough guaranteed buffers " +
-   "for this input gate. Input 
gates require at least as many buffers as " +
+   "for this input gate. Input gates 
require at least as many buffers as " +
"there are input channels.");
+   }
 
checkState(this.bufferPool == null, "Bug in input gate setup 
logic: buffer pool has" +
-   "already been set for this input gate.");
+   "already been set for this input gate.");
 
this.bufferPool = checkNotNull(bufferPool);
}
 
+   /**
+* Assign the exclusive buffers to all remote input channels directly 
for credit-based mode.
+*
+* @param networkBufferPool The global pool to request and recycle 
exclusive buffers
+* @param networkBuffersPerChannel The number of exclusive buffers for 
each channel
+*/
+   public void assignExclusiveSegments(NetworkBufferPool 
networkBufferPool, int networkBuffersPerChannel) throws IOException {
+   this.networkBufferPool = checkNotNull(networkBufferPool);
--- End diff --

please guard against using this method multiple times (like in 
`setBufferPool`) as a sanity check


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7129) Dynamically changing patterns

2017-08-28 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143558#comment-16143558
 ] 

Aljoscha Krettek commented on FLINK-7129:
-

I think it should work. 

> Dynamically changing patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r135481583
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() {
 
public void setBufferPool(BufferPool bufferPool) {
// Sanity checks
-   checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
+   if (!getConsumedPartitionType().isCreditBased()) {
+   checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
"Bug in input gate setup logic: buffer pool has 
not enough guaranteed buffers " +
-   "for this input gate. Input 
gates require at least as many buffers as " +
+   "for this input gate. Input gates 
require at least as many buffers as " +
"there are input channels.");
+   }
 
checkState(this.bufferPool == null, "Bug in input gate setup 
logic: buffer pool has" +
-   "already been set for this input gate.");
+   "already been set for this input gate.");
 
this.bufferPool = checkNotNull(bufferPool);
}
 
+   /**
+* Assign the exclusive buffers to all remote input channels directly 
for credit-based mode.
+*
+* @param networkBufferPool The global pool to request and recycle 
exclusive buffers
+* @param networkBuffersPerChannel The number of exclusive buffers for 
each channel
+*/
+   public void assignExclusiveSegments(NetworkBufferPool 
networkBufferPool, int networkBuffersPerChannel) throws IOException {
+   this.networkBufferPool = checkNotNull(networkBufferPool);
--- End diff --

please guard against using this method multiple times (like in 
`setBufferPool`) as a sanity check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143553#comment-16143553
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r135480975
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -131,6 +133,50 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
 
+   public List requestMemorySegments(int 
numRequiredBuffers) throws IOException {
+   synchronized (factoryLock) {
--- End diff --

should we add a `Preconditions.checkArgument(numRequiredBuffers > 0)`?


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-08-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r135480975
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -131,6 +133,50 @@ public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
}
 
+   public List requestMemorySegments(int 
numRequiredBuffers) throws IOException {
+   synchronized (factoryLock) {
--- End diff --

should we add a `Preconditions.checkArgument(numRequiredBuffers > 0)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7509) Refactorings to AggregateCodeGenerator

2017-08-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143526#comment-16143526
 ] 

Fabian Hueske commented on FLINK-7509:
--

Yes, with more special cases being added it might make sense to split the code 
for the different aggregation types:

- batch group aggregation
- stream non-windowed group aggregation
- stream group windowed aggregation
- stream over windowed aggregation

But maybe another separation is more meaningful.

Do you have concrete plans for the split [~jark]?


> Refactorings to AggregateCodeGenerator
> --
>
> Key: FLINK-7509
> URL: https://issues.apache.org/jira/browse/FLINK-7509
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> I think the `AggregateCodeGenerator#generateAggregations` is too long with 
> 500+ LOC currently and hard to extend. I would like to refactor it if you 
> have no objection.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-08-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143522#comment-16143522
 ] 

Fabian Hueske commented on FLINK-6233:
--

Thanks [~xccui], that would be great! :-)

> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime < s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7512) avoid unnecessary buffer copies during network serialization

2017-08-28 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143520#comment-16143520
 ] 

Nico Kruber commented on FLINK-7512:


Hi [~phoenixjiangnan],
no, the error you are seeing is not related to the network code but rather to 
timers and state.

> avoid unnecessary buffer copies during network serialization
> 
>
> Key: FLINK-7512
> URL: https://issues.apache.org/jira/browse/FLINK-7512
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, we have our own {{Buffer}} class backed by a {{MemorySegment}} and 
> whenever we try to write to or read from Netty, we need to copy to / from 
> Netty's {{ByteBuf}} instances.
> This is am umbrella task for avoiding these buffer copies and some related 
> code changes



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143517#comment-16143517
 ] 

Fabian Hueske commented on FLINK-7446:
--

I think that's a good plan [~jark]. 
However, we should design it in a way that we can support periodic and 
punctuated watermarks. 

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143500#comment-16143500
 ] 

ASF GitHub Bot commented on FLINK-7358:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4534
  
The text above `"What is the purpose of the change"` should be removed such 
that only the relevant info is in the PR description.



> Add  implicitly converts support for User-defined function
> --
>
> Key: FLINK-7358
> URL: https://issues.apache.org/jira/browse/FLINK-7358
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently if user defined a UDF as follows:
> {code}
> object Func extends ScalarFunction {
>   def eval(a: Int, b: Long): String = {
> ...
>   }
> }
> {code}
> And if the table schema is (a: Int, b: int, c: String), then we can not call 
> the UDF `Func('a, 'b)`. So
> I want add implicitly converts when we call UDF. The implicitly convert rule 
> is:
> BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> 
> FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO
> *Note:
> In this JIRA. only for TableAPI, And SQL will be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-1908.*
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4534: [FLINK-7358][table]Add implicitly converts support for Us...

2017-08-28 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4534
  
The text above `"What is the purpose of the change"` should be removed such 
that only the relevant info is in the PR description.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7410) Add getName method to UserDefinedFunction

2017-08-28 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143474#comment-16143474
 ] 

Timo Walther commented on FLINK-7410:
-

No, the {{toString}} method is not used yet. We could use it for this purpose.

> Add getName method to UserDefinedFunction
> -
>
> Key: FLINK-7410
> URL: https://issues.apache.org/jira/browse/FLINK-7410
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> *Motivation*
> Operator names setted in table-api are used by visualization and logging, it 
> is import to make these names simple and readable. Currently, 
> UserDefinedFunction’s name contains class CanonicalName and md5 value making 
> the name too long and unfriendly to users. 
> As shown in the following example, 
> {quote}
> select: (a, b, c, 
> org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a)
>  AS _c3, 
> org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c)
>  AS _c4)
> {quote}
> *Changes:*
>   
> Provide getName method for UserDefinedFunction. The method will return class 
> name by default. Users can also override the method to return whatever he 
> wants.
> What do you think [~fhueske] ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >