[jira] [Commented] (FLINK-3529) Add pull request template

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15177442#comment-15177442
 ] 

ASF GitHub Bot commented on FLINK-3529:
---

Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/1729#issuecomment-191639327
  
Should be a link from CONTRIBUTING page to this one.


> Add pull request template
> -
>
> Key: FLINK-3529
> URL: https://issues.apache.org/jira/browse/FLINK-3529
> Project: Flink
>  Issue Type: Task
>  Components: other
>Reporter: Martin Liesenberg
>Assignee: Martin Liesenberg
>Priority: Minor
>
> Add a template for pull requests, checking if prerequisites of opening a PR 
> have been fulfilled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3529 Add template for pull requests

2016-03-02 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/1729#issuecomment-191639327
  
Should be a link from CONTRIBUTING page to this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15177390#comment-15177390
 ] 

ASF GitHub Bot commented on FLINK-3474:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/1746#issuecomment-191621309
  
Hi, @fhueske , i found 2 test failure due to the input data of 
`AggregateMapFunction` is `Tuple` instead of `Row` while table config is 
`EFFICIENT`. I thought `Tuple` as other types are only supported as data 
source, inside SQL operators only support `Row`. Why this is introduced? more 
efficient? I thought the road map would go to Row which store binary data.


> Partial aggregate interface design and sort-based implementation
> 
>
> Key: FLINK-3474
> URL: https://issues.apache.org/jira/browse/FLINK-3474
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node to 
> Flink user functions. As hash-based combiner is not available yet(see PR 
> #1517), we would use sort-based combine as default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3474] support partial aggregate

2016-03-02 Thread ChengXiangLi
Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/1746#issuecomment-191621309
  
Hi, @fhueske , i found 2 test failure due to the input data of 
`AggregateMapFunction` is `Tuple` instead of `Row` while table config is 
`EFFICIENT`. I thought `Tuple` as other types are only supported as data 
source, inside SQL operators only support `Row`. Why this is introduced? more 
efficient? I thought the road map would go to Row which store binary data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3473) Add partial aggregate support in Flink

2016-03-02 Thread Chengxiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chengxiang Li updated FLINK-3473:
-
Attachment: PartialAggregateinFlink_v2.pdf

> Add partial aggregate support in Flink
> --
>
> Key: FLINK-3473
> URL: https://issues.apache.org/jira/browse/FLINK-3473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
> Attachments: PartialAggregateinFlink_v1.pdf, 
> PartialAggregateinFlink_v2.pdf
>
>
> For decomposable aggregate function, partial aggregate is more efficient as 
> it significantly reduce the network traffic during shuffle and parallelize 
> part of the aggregate calculation. This is an umbrella task for partial 
> aggregate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15177260#comment-15177260
 ] 

ASF GitHub Bot commented on FLINK-3474:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54838441
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import com.google.common.base.Preconditions
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction, RichMapPartitionFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+ * It wraps the aggregate logic inside of 
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row 
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceGroupFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)
+extends RichGroupReduceFunction[Row, Row] {
+
+  private val finalRowLength: Int = groupKeysMapping.length + 
aggregateMapping.length
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+output = new Row(finalRowLength)
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them 
into output 
+   * Row based on the mapping relation between intermediate aggregate data 
and output data.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+// Merge intermediate aggregate value to buffer.
+var last: Row = null
+records.foreach((record) =>  {
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+  last = record
+})
+
+// Set group keys to aggregateBuffer.
+for (i <- 0 until groupKeysMapping.length) {
--- End diff --

yes, i would update this.


> Partial aggregate interface design and sort-based implementation
> 
>
> Key: FLINK-3474
> URL: https://issues.apache.org/jira/browse/FLINK-3474
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node to 
> Flink user functions. As hash-based combiner is not available yet(see PR 
> #1517), we would use sort-based combine as default.



--

[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15177259#comment-15177259
 ] 

ASF GitHub Bot commented on FLINK-3474:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54838416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
 ---
@@ -17,26 +17,77 @@
  */
 package org.apache.flink.api.table.runtime.aggregate
 
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.table.Row
+
 /**
- * Represents a SQL aggregate function. The user should first initialize 
the aggregate, then feed it
- * with grouped aggregate field values, and finally get the aggregated 
value.
- * @tparam T the output type
+ * The interface for all Flink aggregate functions, which expressed in 
terms of initiate(),
+ * prepare(), merge() and evaluate(). The aggregate functions would be 
executed in 2 phases:
+ * -- In Map phase, use prepare() to transform aggregate field value into 
intermediate
+ * aggregate value.
+ * -- In GroupReduce phase, use merge() to merge grouped intermediate 
aggregate values
+ * into aggregate buffer. Then use evaluate() to calculate the final 
aggregated value.
+ * For associative decomposable aggregate functions, they support partial 
aggregate. To optimize
+ * the performance, a Combine phase would be added between Map phase and 
GroupReduce phase,
+ * -- In Combine phase, use merge() to merge sub-grouped intermediate 
aggregate values
+ * into aggregate buffer.
+ *
+ * The intermediate aggregate value is stored inside Row, aggOffsetInRow 
is used as the start
+ * field index in Row, so different aggregate functions could share the 
same Row as intermediate
+ * aggregate value/aggregate buffer, as their aggregate values could be 
stored in distinct fields
+ * of Row with no conflict. The intermediate aggregate value is required 
to be a sequence of JVM
+ * primitives, and Flink use intermediateDataType() to get its data types 
in SQL side.
+ *
+ * @tparam T Aggregated value type.
  */
 trait Aggregate[T] extends Serializable {
+
+  protected var aggOffsetInRow: Int = _
+
   /**
-   * Initialize the aggregate state.
+   * Initiate the intermediate aggregate value in Row.
+   * @param intermediate
*/
-  def initiateAggregate
+  def initiate(intermediate: Row): Unit
 
   /**
-   * Feed the aggregate field value.
+   * Transform the aggregate field value into intermediate aggregate data.
* @param value
+   * @param intermediate
*/
-  def aggregate(value: Any)
+  def prepare(value: Any, intermediate: Row): Unit
--- End diff --

So null value handling should be done inside aggregate function 
implementation instead of at aggregate interface level, there was an umbrella 
JIRA about null value handling in Table API(FLINK-3139), i would create a 
subtask which do the null value handling for COUNT/SUM/MIN/MAX/AVG which are 
introduced in this PR, is that ok?


> Partial aggregate interface design and sort-based implementation
> 
>
> Key: FLINK-3474
> URL: https://issues.apache.org/jira/browse/FLINK-3474
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node to 
> Flink user functions. As hash-based combiner is not available yet(see PR 
> #1517), we would use sort-based combine as default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3474] support partial aggregate

2016-03-02 Thread ChengXiangLi
Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54838441
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import com.google.common.base.Preconditions
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction, RichMapPartitionFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+ * It wraps the aggregate logic inside of 
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row 
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceGroupFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)
+extends RichGroupReduceFunction[Row, Row] {
+
+  private val finalRowLength: Int = groupKeysMapping.length + 
aggregateMapping.length
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+output = new Row(finalRowLength)
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them 
into output 
+   * Row based on the mapping relation between intermediate aggregate data 
and output data.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+// Merge intermediate aggregate value to buffer.
+var last: Row = null
+records.foreach((record) =>  {
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+  last = record
+})
+
+// Set group keys to aggregateBuffer.
+for (i <- 0 until groupKeysMapping.length) {
--- End diff --

yes, i would update this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3474] support partial aggregate

2016-03-02 Thread ChengXiangLi
Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54838416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
 ---
@@ -17,26 +17,77 @@
  */
 package org.apache.flink.api.table.runtime.aggregate
 
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.table.Row
+
 /**
- * Represents a SQL aggregate function. The user should first initialize 
the aggregate, then feed it
- * with grouped aggregate field values, and finally get the aggregated 
value.
- * @tparam T the output type
+ * The interface for all Flink aggregate functions, which expressed in 
terms of initiate(),
+ * prepare(), merge() and evaluate(). The aggregate functions would be 
executed in 2 phases:
+ * -- In Map phase, use prepare() to transform aggregate field value into 
intermediate
+ * aggregate value.
+ * -- In GroupReduce phase, use merge() to merge grouped intermediate 
aggregate values
+ * into aggregate buffer. Then use evaluate() to calculate the final 
aggregated value.
+ * For associative decomposable aggregate functions, they support partial 
aggregate. To optimize
+ * the performance, a Combine phase would be added between Map phase and 
GroupReduce phase,
+ * -- In Combine phase, use merge() to merge sub-grouped intermediate 
aggregate values
+ * into aggregate buffer.
+ *
+ * The intermediate aggregate value is stored inside Row, aggOffsetInRow 
is used as the start
+ * field index in Row, so different aggregate functions could share the 
same Row as intermediate
+ * aggregate value/aggregate buffer, as their aggregate values could be 
stored in distinct fields
+ * of Row with no conflict. The intermediate aggregate value is required 
to be a sequence of JVM
+ * primitives, and Flink use intermediateDataType() to get its data types 
in SQL side.
+ *
+ * @tparam T Aggregated value type.
  */
 trait Aggregate[T] extends Serializable {
+
+  protected var aggOffsetInRow: Int = _
+
   /**
-   * Initialize the aggregate state.
+   * Initiate the intermediate aggregate value in Row.
+   * @param intermediate
*/
-  def initiateAggregate
+  def initiate(intermediate: Row): Unit
 
   /**
-   * Feed the aggregate field value.
+   * Transform the aggregate field value into intermediate aggregate data.
* @param value
+   * @param intermediate
*/
-  def aggregate(value: Any)
+  def prepare(value: Any, intermediate: Row): Unit
--- End diff --

So null value handling should be done inside aggregate function 
implementation instead of at aggregate interface level, there was an umbrella 
JIRA about null value handling in Table API(FLINK-3139), i would create a 
subtask which do the null value handling for COUNT/SUM/MIN/MAX/AVG which are 
introduced in this PR, is that ok?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [docs] fix javascript exception caused by disq...

2016-03-02 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/1756

[docs] fix javascript exception caused by disqus

As we comment `` but not the disqus 
javascript. It will cause a exception like this:


![image](https://cloud.githubusercontent.com/assets/5378924/13482083/40bd92ea-e125-11e5-90f4-2682b763a288.png)

And fix a minor typo in `cluster_execution.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1756.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1756


commit 713d418b6259ee94abe35edf710c5cabf978e1c2
Author: Jark Wu 
Date:   2016-03-03T01:43:38Z

[docs] fix javascript exception caused by disqus and fix typos in cluster 
execution.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3554) Bounded sources should emit a Max Watermark when they are done

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176939#comment-15176939
 ] 

ASF GitHub Bot commented on FLINK-3554:
---

Github user wenlonglwl commented on the pull request:

https://github.com/apache/flink/pull/1750#issuecomment-191534452
  
just want to remind, this dose not completely solve the problem of loss of 
pending windows over finite sources~ looking forward to the solutions~


> Bounded sources should emit a Max Watermark when they are done
> --
>
> Key: FLINK-3554
> URL: https://issues.apache.org/jira/browse/FLINK-3554
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.0.0
>
>
> For proper event time support in bounded sources, these sources should emit a 
> final watermark before shutting down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3498) Implement TRIM, SUBSTRING as reference design for Table API

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176768#comment-15176768
 ] 

ASF GitHub Bot commented on FLINK-3498:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1747#issuecomment-191503064
  
Will merge this to `tableOnCalcite` branch.


> Implement TRIM, SUBSTRING as reference design for Table API
> ---
>
> Key: FLINK-3498
> URL: https://issues.apache.org/jira/browse/FLINK-3498
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> As described in FLINK-3497 TRIM and SUBSTRING are the first scalar functions 
> implemented in Calcite and used in Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3498] Implement TRIM, SUBSTRING as refe...

2016-03-02 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1747#issuecomment-191503064
  
Will merge this to `tableOnCalcite` branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3486] [tableAPI] Fix broken renaming of...

2016-03-02 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1738#issuecomment-191502994
  
Will merge this to `tableOnCalcite` branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3486) Use Project to rename all record fields would fail following Project.

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176766#comment-15176766
 ] 

ASF GitHub Bot commented on FLINK-3486:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1738#issuecomment-191502994
  
Will merge this to `tableOnCalcite` branch.


> Use Project to rename all record fields would fail following Project.
> -
>
> Key: FLINK-3486
> URL: https://issues.apache.org/jira/browse/FLINK-3486
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Fabian Hueske
>
> {noformat} val t = CollectionDataSets.get3TupleDataSet(env).toTable
>   .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
>   .select('a, 'b)
> {noformat}
> would throw exception like:
> {noformat}
> java.lang.IllegalArgumentException: field [a] not found; input fields are: 
> [_1, _2, _3]
>   at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:290)
>   at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:275)
>   at 
> org.apache.flink.api.table.plan.RexNodeTranslator$.toRexNode(RexNodeTranslator.scala:80)
>   at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98)
>   at org.apache.flink.api.table.Table$$anonfun$5.apply(table.scala:98) 
> {noformat}
> new alias names are lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3564) Implement distinct() for Table API

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176735#comment-15176735
 ] 

ASF GitHub Bot commented on FLINK-3564:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1754#discussion_r54814006
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+   public DistinctITCase(TestExecutionMode mode){
+   super(mode);
+   }
+
+   @Test
+   public void testAllDistinct() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.getSmall3TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+   Table distinct = table.distinct();
+
+   DataSet ds = tableEnv.toDataSet(distinct, Row.class);
+   List results = ds.collect();
+   String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n";
+   compareResultAsText(results, expected);
+   }
+
+   @Test
+   public void testDistinct() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.getSmall3TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+   Table distinct = table.select("b").distinct();
+
+   DataSet ds = tableEnv.toDataSet(distinct, Row.class);
+   List results = ds.collect();
+   String expected = "1\n" + "2\n";
+   compareResultAsText(results, expected);
+   }
+
+   @Test
+   public void testDistinctAfterAggregate() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.get5TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
+
+   Table distinct = table.groupBy("a").select("e").distinct();
--- End diff --

Isn't the result of `groupBy('a).select('e)` non-deterministic? Many 
database systems do not even allow this.

Btw, I looked at the implementation of `GroupedTable.select()` and it looks 
incorrect. It does not group but just add a selection. Either we should reject 
the select or select the value an arbitrary row of the group.


> Implement distinct() for Table API
> --
>
> Key: FLINK-3564
> URL: https://issues.apache.org/jira/browse/FLINK-3564
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>  

[GitHub] flink pull request: [FLINK-3564] [table] Implement distinct() for ...

2016-03-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1754#discussion_r54814006
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+   public DistinctITCase(TestExecutionMode mode){
+   super(mode);
+   }
+
+   @Test
+   public void testAllDistinct() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.getSmall3TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+   Table distinct = table.distinct();
+
+   DataSet ds = tableEnv.toDataSet(distinct, Row.class);
+   List results = ds.collect();
+   String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n";
+   compareResultAsText(results, expected);
+   }
+
+   @Test
+   public void testDistinct() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.getSmall3TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+   Table distinct = table.select("b").distinct();
+
+   DataSet ds = tableEnv.toDataSet(distinct, Row.class);
+   List results = ds.collect();
+   String expected = "1\n" + "2\n";
+   compareResultAsText(results, expected);
+   }
+
+   @Test
+   public void testDistinctAfterAggregate() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.get5TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
+
+   Table distinct = table.groupBy("a").select("e").distinct();
--- End diff --

Isn't the result of `groupBy('a).select('e)` non-deterministic? Many 
database systems do not even allow this.

Btw, I looked at the implementation of `GroupedTable.select()` and it looks 
incorrect. It does not group but just add a selection. Either we should reject 
the select or select the value an arbitrary row of the group.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3564) Implement distinct() for Table API

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176674#comment-15176674
 ] 

ASF GitHub Bot commented on FLINK-3564:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1754#discussion_r54808793
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+   public DistinctITCase(TestExecutionMode mode){
+   super(mode);
+   }
+
+   @Test
+   public void testAllDistinct() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.getSmall3TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+   Table distinct = table.distinct();
+
+   DataSet ds = tableEnv.toDataSet(distinct, Row.class);
+   List results = ds.collect();
+   String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n";
+   compareResultAsText(results, expected);
+   }
+
+   @Test
+   public void testDistinct() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.getSmall3TupleDataSet(env);
--- End diff --

Why not use `get3TupleDataSet()` which has more duplicates? 


> Implement distinct() for Table API
> --
>
> Key: FLINK-3564
> URL: https://issues.apache.org/jira/browse/FLINK-3564
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Minor
>
> This is only syntactic sugar for grouping of all fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3564] [table] Implement distinct() for ...

2016-03-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1754#discussion_r54808793
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+   public DistinctITCase(TestExecutionMode mode){
+   super(mode);
+   }
+
+   @Test
+   public void testAllDistinct() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.getSmall3TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+   Table distinct = table.distinct();
+
+   DataSet ds = tableEnv.toDataSet(distinct, Row.class);
+   List results = ds.collect();
+   String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n";
+   compareResultAsText(results, expected);
+   }
+
+   @Test
+   public void testDistinct() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.getSmall3TupleDataSet(env);
--- End diff --

Why not use `get3TupleDataSet()` which has more duplicates? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3564) Implement distinct() for Table API

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176672#comment-15176672
 ] 

ASF GitHub Bot commented on FLINK-3564:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1754#discussion_r54808671
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+   public DistinctITCase(TestExecutionMode mode){
+   super(mode);
+   }
+
+   @Test
+   public void testAllDistinct() throws Exception {
--- End diff --

This test does not check if `distinct` is working. The tuples of 
`CollectionDataSets.getSmall3TupleDataSet()` are already distinct.


> Implement distinct() for Table API
> --
>
> Key: FLINK-3564
> URL: https://issues.apache.org/jira/browse/FLINK-3564
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Minor
>
> This is only syntactic sugar for grouping of all fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3564] [table] Implement distinct() for ...

2016-03-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1754#discussion_r54808671
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/DistinctITCase.java
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DistinctITCase extends MultipleProgramsTestBase {
+
+   public DistinctITCase(TestExecutionMode mode){
+   super(mode);
+   }
+
+   @Test
+   public void testAllDistinct() throws Exception {
--- End diff --

This test does not check if `distinct` is working. The tuples of 
`CollectionDataSets.getSmall3TupleDataSet()` are already distinct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11

2016-03-02 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-3565.
---
   Resolution: Fixed
Fix Version/s: 1.1.0
   1.0.0

Resolved for 1.0 in http://git-wip-us.apache.org/repos/asf/flink/commit/3adc5148
for 1.1 in  http://git-wip-us.apache.org/repos/asf/flink/commit/072da7de

> FlinkKafkaConsumer does not work with Scala 2.11 
> -
>
> Key: FLINK-3565
> URL: https://issues.apache.org/jira/browse/FLINK-3565
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Maximilian Michels
>Priority: Blocker
> Fix For: 1.0.0, 1.1.0
>
>
> Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 
> cluster with this exception:
> java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
>   at kafka.utils.Pool.(Pool.scala:28)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>   at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160)
>   at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.ClassNotFoundException: 
> scala.collection.GenTraversableOnce$class
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   ... 21 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176241#comment-15176241
 ] 

ASF GitHub Bot commented on FLINK-3565:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1755


> FlinkKafkaConsumer does not work with Scala 2.11 
> -
>
> Key: FLINK-3565
> URL: https://issues.apache.org/jira/browse/FLINK-3565
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Maximilian Michels
>Priority: Blocker
>
> Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 
> cluster with this exception:
> java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
>   at kafka.utils.Pool.(Pool.scala:28)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>   at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160)
>   at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.ClassNotFoundException: 
> scala.collection.GenTraversableOnce$class
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   ... 21 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3565] add module to force execution of ...

2016-03-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1755


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3565] add module to force execution of ...

2016-03-02 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1755#issuecomment-191343477
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3474] support partial aggregate

2016-03-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54763270
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import com.google.common.base.Preconditions
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction, RichMapPartitionFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+ * It wraps the aggregate logic inside of 
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row 
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceGroupFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)
+extends RichGroupReduceFunction[Row, Row] {
+
+  private val finalRowLength: Int = groupKeysMapping.length + 
aggregateMapping.length
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+output = new Row(finalRowLength)
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them 
into output 
+   * Row based on the mapping relation between intermediate aggregate data 
and output data.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+// Merge intermediate aggregate value to buffer.
+var last: Row = null
+records.foreach((record) =>  {
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+  last = record
+})
+
+// Set group keys to aggregateBuffer.
+for (i <- 0 until groupKeysMapping.length) {
--- End diff --

Is this necessary? Looks like we copy the keys first into the 
`aggregateBuffer` and then into the `output` row. Can't we copy the keys 
directly from `last` to `output`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176104#comment-15176104
 ] 

ASF GitHub Bot commented on FLINK-3474:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54763270
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import com.google.common.base.Preconditions
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction, RichMapPartitionFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+ * It wraps the aggregate logic inside of 
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row 
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceGroupFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)
+extends RichGroupReduceFunction[Row, Row] {
+
+  private val finalRowLength: Int = groupKeysMapping.length + 
aggregateMapping.length
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+output = new Row(finalRowLength)
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them 
into output 
+   * Row based on the mapping relation between intermediate aggregate data 
and output data.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+// Merge intermediate aggregate value to buffer.
+var last: Row = null
+records.foreach((record) =>  {
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+  last = record
+})
+
+// Set group keys to aggregateBuffer.
+for (i <- 0 until groupKeysMapping.length) {
--- End diff --

Is this necessary? Looks like we copy the keys first into the 
`aggregateBuffer` and then into the `output` row. Can't we copy the keys 
directly from `last` to `output`?


> Partial aggregate interface design and sort-based implementation
> 
>
> Key: FLINK-3474
> URL: https://issues.apache.org/jira/browse/FLINK-3474
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node 

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

2016-03-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54761986
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
 ---
@@ -17,26 +17,77 @@
  */
 package org.apache.flink.api.table.runtime.aggregate
 
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.table.Row
+
 /**
- * Represents a SQL aggregate function. The user should first initialize 
the aggregate, then feed it
- * with grouped aggregate field values, and finally get the aggregated 
value.
- * @tparam T the output type
+ * The interface for all Flink aggregate functions, which expressed in 
terms of initiate(),
+ * prepare(), merge() and evaluate(). The aggregate functions would be 
executed in 2 phases:
+ * -- In Map phase, use prepare() to transform aggregate field value into 
intermediate
+ * aggregate value.
+ * -- In GroupReduce phase, use merge() to merge grouped intermediate 
aggregate values
+ * into aggregate buffer. Then use evaluate() to calculate the final 
aggregated value.
+ * For associative decomposable aggregate functions, they support partial 
aggregate. To optimize
+ * the performance, a Combine phase would be added between Map phase and 
GroupReduce phase,
+ * -- In Combine phase, use merge() to merge sub-grouped intermediate 
aggregate values
+ * into aggregate buffer.
+ *
+ * The intermediate aggregate value is stored inside Row, aggOffsetInRow 
is used as the start
+ * field index in Row, so different aggregate functions could share the 
same Row as intermediate
+ * aggregate value/aggregate buffer, as their aggregate values could be 
stored in distinct fields
+ * of Row with no conflict. The intermediate aggregate value is required 
to be a sequence of JVM
+ * primitives, and Flink use intermediateDataType() to get its data types 
in SQL side.
+ *
+ * @tparam T Aggregated value type.
  */
 trait Aggregate[T] extends Serializable {
+
+  protected var aggOffsetInRow: Int = _
+
   /**
-   * Initialize the aggregate state.
+   * Initiate the intermediate aggregate value in Row.
+   * @param intermediate
*/
-  def initiateAggregate
+  def initiate(intermediate: Row): Unit
 
   /**
-   * Feed the aggregate field value.
+   * Transform the aggregate field value into intermediate aggregate data.
* @param value
+   * @param intermediate
*/
-  def aggregate(value: Any)
+  def prepare(value: Any, intermediate: Row): Unit
--- End diff --

Yes, this would be aggregation specific. 
For example for a `SUM` aggregation, `prepare` could insert a `0`, which is 
basically the same what `initiate` would do. However, it is also OK, to do it 
in `prepare` directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176085#comment-15176085
 ] 

ASF GitHub Bot commented on FLINK-3474:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54761986
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
 ---
@@ -17,26 +17,77 @@
  */
 package org.apache.flink.api.table.runtime.aggregate
 
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.table.Row
+
 /**
- * Represents a SQL aggregate function. The user should first initialize 
the aggregate, then feed it
- * with grouped aggregate field values, and finally get the aggregated 
value.
- * @tparam T the output type
+ * The interface for all Flink aggregate functions, which expressed in 
terms of initiate(),
+ * prepare(), merge() and evaluate(). The aggregate functions would be 
executed in 2 phases:
+ * -- In Map phase, use prepare() to transform aggregate field value into 
intermediate
+ * aggregate value.
+ * -- In GroupReduce phase, use merge() to merge grouped intermediate 
aggregate values
+ * into aggregate buffer. Then use evaluate() to calculate the final 
aggregated value.
+ * For associative decomposable aggregate functions, they support partial 
aggregate. To optimize
+ * the performance, a Combine phase would be added between Map phase and 
GroupReduce phase,
+ * -- In Combine phase, use merge() to merge sub-grouped intermediate 
aggregate values
+ * into aggregate buffer.
+ *
+ * The intermediate aggregate value is stored inside Row, aggOffsetInRow 
is used as the start
+ * field index in Row, so different aggregate functions could share the 
same Row as intermediate
+ * aggregate value/aggregate buffer, as their aggregate values could be 
stored in distinct fields
+ * of Row with no conflict. The intermediate aggregate value is required 
to be a sequence of JVM
+ * primitives, and Flink use intermediateDataType() to get its data types 
in SQL side.
+ *
+ * @tparam T Aggregated value type.
  */
 trait Aggregate[T] extends Serializable {
+
+  protected var aggOffsetInRow: Int = _
+
   /**
-   * Initialize the aggregate state.
+   * Initiate the intermediate aggregate value in Row.
+   * @param intermediate
*/
-  def initiateAggregate
+  def initiate(intermediate: Row): Unit
 
   /**
-   * Feed the aggregate field value.
+   * Transform the aggregate field value into intermediate aggregate data.
* @param value
+   * @param intermediate
*/
-  def aggregate(value: Any)
+  def prepare(value: Any, intermediate: Row): Unit
--- End diff --

Yes, this would be aggregation specific. 
For example for a `SUM` aggregation, `prepare` could insert a `0`, which is 
basically the same what `initiate` would do. However, it is also OK, to do it 
in `prepare` directly.


> Partial aggregate interface design and sort-based implementation
> 
>
> Key: FLINK-3474
> URL: https://issues.apache.org/jira/browse/FLINK-3474
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node to 
> Flink user functions. As hash-based combiner is not available yet(see PR 
> #1517), we would use sort-based combine as default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3474] support partial aggregate

2016-03-02 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1746#issuecomment-191343405
  
Yes, you are right. 
We need a `RichCombineToGroupCombineWrapper` and inject it as a temporary 
fix until we have dedicated drivers for `CombineFunction`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176060#comment-15176060
 ] 

ASF GitHub Bot commented on FLINK-3565:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1755#issuecomment-191343477
  
+1 to merge


> FlinkKafkaConsumer does not work with Scala 2.11 
> -
>
> Key: FLINK-3565
> URL: https://issues.apache.org/jira/browse/FLINK-3565
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Maximilian Michels
>Priority: Blocker
>
> Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 
> cluster with this exception:
> java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
>   at kafka.utils.Pool.(Pool.scala:28)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>   at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160)
>   at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.ClassNotFoundException: 
> scala.collection.GenTraversableOnce$class
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   ... 21 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3498) Implement TRIM, SUBSTRING as reference design for Table API

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176050#comment-15176050
 ] 

ASF GitHub Bot commented on FLINK-3498:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1747#issuecomment-191341765
  
OK, I see. Sounds good to me.
+1 to merge. 


> Implement TRIM, SUBSTRING as reference design for Table API
> ---
>
> Key: FLINK-3498
> URL: https://issues.apache.org/jira/browse/FLINK-3498
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> As described in FLINK-3497 TRIM and SUBSTRING are the first scalar functions 
> implemented in Calcite and used in Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-03-02 Thread Gabor Horvath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Horvath reassigned FLINK-3322:


Assignee: Gabor Horvath

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Horvath
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3565] add module to force execution of ...

2016-03-02 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1755

[FLINK-3565] add module to force execution of Shade plugin

This ensures that all properties of the root pom are properly
resolved by running the Shade plugin. Thus, our root pom does not have
to depend on a Scala version just because it holds the Scala version
properties.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3565

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1755


commit 47de08a9503da99b11fc45a7f299671ecab0b21e
Author: Maximilian Michels 
Date:   2016-03-02T16:52:05Z

[maven] add module to force execution of Shade plugin

This ensures that all properties of the root pom are properly
resolved by running the Shade plugin. Thus, our root pom does not have
to depend on a Scala version just because it holds the Scala version
properties.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176012#comment-15176012
 ] 

ASF GitHub Bot commented on FLINK-3565:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1755

[FLINK-3565] add module to force execution of Shade plugin

This ensures that all properties of the root pom are properly
resolved by running the Shade plugin. Thus, our root pom does not have
to depend on a Scala version just because it holds the Scala version
properties.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3565

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1755


commit 47de08a9503da99b11fc45a7f299671ecab0b21e
Author: Maximilian Michels 
Date:   2016-03-02T16:52:05Z

[maven] add module to force execution of Shade plugin

This ensures that all properties of the root pom are properly
resolved by running the Shade plugin. Thus, our root pom does not have
to depend on a Scala version just because it holds the Scala version
properties.




> FlinkKafkaConsumer does not work with Scala 2.11 
> -
>
> Key: FLINK-3565
> URL: https://issues.apache.org/jira/browse/FLINK-3565
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Maximilian Michels
>Priority: Blocker
>
> Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 
> cluster with this exception:
> java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
>   at kafka.utils.Pool.(Pool.scala:28)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>   at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160)
>   at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.ClassNotFoundException: 
> scala.collection.GenTraversableOnce$class
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   ... 21 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11

2016-03-02 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15176007#comment-15176007
 ] 

Maximilian Michels commented on FLINK-3565:
---

Thanks to [~till.rohrmann], I've come up with another approach which is also 
used in other projects. The idea is to add a dependency to the root pom which 
is then explicitly shaded from all modules. That way, the Shade plugin always 
generates the so called "effective" poms. Effective poms have their properties 
resolved. Thus, we can configure the Scala version like before in the root pom. 
When Maven packages the Jars, the Shade plugin will ensure that all poms are 
made "effective".

I don't know whether we stick to this approach. It is possible to move to 
another solution later on. Most importantly, this change is transparent to the 
user.

In contrast to this, I've created an independent module in our source. So 
everything is self-contained and we don't rely on an external Maven module: 
https://issues.apache.org/jira/browse/SPARK-3812

> FlinkKafkaConsumer does not work with Scala 2.11 
> -
>
> Key: FLINK-3565
> URL: https://issues.apache.org/jira/browse/FLINK-3565
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Maximilian Michels
>Priority: Blocker
>
> Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 
> cluster with this exception:
> java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
>   at kafka.utils.Pool.(Pool.scala:28)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>   at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160)
>   at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.ClassNotFoundException: 
> scala.collection.GenTraversableOnce$class
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   ... 21 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3422) Scramble HashPartitioner hashes

2016-03-02 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-3422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi resolved FLINK-3422.
---
Resolution: Fixed

Fixed via 0ff286d and f0f93c2.

> Scramble HashPartitioner hashes
> ---
>
> Key: FLINK-3422
> URL: https://issues.apache.org/jira/browse/FLINK-3422
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
>Assignee: Gabor Horvath
>Priority: Critical
> Fix For: 1.0.0
>
>
> The {{HashPartitioner}} used by the streaming API does not apply any hash 
> scrambling against bad user hash functions.
> We should apply a murmor or jenkins hash on top of the hash code, similar as 
> in the {{DataSet}} API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3422][streaming][api-breaking] Scramble...

2016-03-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1685


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3422) Scramble HashPartitioner hashes

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175883#comment-15175883
 ] 

ASF GitHub Bot commented on FLINK-3422:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1685


> Scramble HashPartitioner hashes
> ---
>
> Key: FLINK-3422
> URL: https://issues.apache.org/jira/browse/FLINK-3422
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
>Assignee: Gabor Horvath
>Priority: Critical
> Fix For: 1.0.0
>
>
> The {{HashPartitioner}} used by the streaming API does not apply any hash 
> scrambling against bad user hash functions.
> We should apply a murmor or jenkins hash on top of the hash code, similar as 
> in the {{DataSet}} API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3566) Input type validation often fails on custom TypeInfo implementations

2016-03-02 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175770#comment-15175770
 ] 

Gyula Fora commented on FLINK-3566:
---

Another example with KeySelectors and interfaces:

interface Event {
String getKey();
}

public static class MyEvent implements Event {
@Override
public String getKey() {
return "";
}

}

public static void main(String[] args) {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream events = env.fromElements(new MyEvent());
applyKeySelector(events);
}

private static  void applyKeySelector(DataStream 
events) {
events.keyBy(e -> e.getKey());
}

This gives a following exception: Input mismatch: Generic object type 
'package.TypeTest.MyEvent' expected but was 'package.TypeTest.Event'.

> Input type validation often fails on custom TypeInfo implementations
> 
>
> Key: FLINK-3566
> URL: https://issues.apache.org/jira/browse/FLINK-3566
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Gyula Fora
>
> Input type validation often fails when used with custom type infos. One 
> example of this behaviour can be reproduced by creating a custom type info 
> with our own field type:
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.generateSequence(1, 10).map(new MapFunction Tuple1>() {
>   @Override
>   public Tuple1 map(Long value) throws 
> Exception {
>   return Tuple1.of(Optional.of(value));
>   }
>   }).returns(new TupleTypeInfo<>(new 
> OptionTypeInfo(BasicTypeInfo.LONG_TYPE_INFO)))
>   .keyBy(new KeySelector, 
> Optional>() {
>   @Override
>   public Optional 
> getKey(Tuple1 value) throws Exception {
>   return value.f0;
>   }
>   });
> This will fail on Input type validation at the KeySelector (or any other 
> function for example a mapper) with the following exception:
> Input mismatch: Basic type expected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3566) Input type validation often fails on custom TypeInfo implementations

2016-03-02 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-3566:
-

 Summary: Input type validation often fails on custom TypeInfo 
implementations
 Key: FLINK-3566
 URL: https://issues.apache.org/jira/browse/FLINK-3566
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Gyula Fora


Input type validation often fails when used with custom type infos. One example 
of this behaviour can be reproduced by creating a custom type info with our own 
field type:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.generateSequence(1, 10).map(new MapFunction>() 
{
@Override
public Tuple1 map(Long value) throws 
Exception {
return Tuple1.of(Optional.of(value));
}
}).returns(new TupleTypeInfo<>(new 
OptionTypeInfo(BasicTypeInfo.LONG_TYPE_INFO)))
.keyBy(new KeySelector, 
Optional>() {

@Override
public Optional 
getKey(Tuple1 value) throws Exception {
return value.f0;
}
});

This will fail on Input type validation at the KeySelector (or any other 
function for example a mapper) with the following exception:

Input mismatch: Basic type expected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3554] [streaming] Emit a MAX Watermark ...

2016-03-02 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1750#issuecomment-191260012
  
Hi,
what are you referring to? The watermark does not interact with count 
windows.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3554) Bounded sources should emit a Max Watermark when they are done

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175663#comment-15175663
 ] 

ASF GitHub Bot commented on FLINK-3554:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1750#issuecomment-191260012
  
Hi,
what are you referring to? The watermark does not interact with count 
windows.


> Bounded sources should emit a Max Watermark when they are done
> --
>
> Key: FLINK-3554
> URL: https://issues.apache.org/jira/browse/FLINK-3554
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.0.0
>
>
> For proper event time support in bounded sources, these sources should emit a 
> final watermark before shutting down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3152) Support all comparisons for Date type

2016-03-02 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175657#comment-15175657
 ] 

Fabian Hueske commented on FLINK-3152:
--

Yes, a subtask of FLINK-3497 sounds good. I will comment on FLINK-3498 soon.

> Support all comparisons for Date type
> -
>
> Key: FLINK-3152
> URL: https://issues.apache.org/jira/browse/FLINK-3152
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Timo Walther
>
> Currently, the Table API does not support comparisons like "DATE < DATE", 
> "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3563) .returns() doesn't compile when using .map() with a custom MapFunction

2016-03-02 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175641#comment-15175641
 ] 

Timo Walther edited comment on FLINK-3563 at 3/2/16 2:11 PM:
-

As far as I know, mixing of Scala and Java API is not supported. So it is 
possible that the type extraction fails in this example. 

I tested this which runs fine:
{code}
text.map(new MapFunction() {
@Override
public Map map(String value) throws Exception {
  return new HashMap(2);
}
}).print();
{code}

Explicit casts are ok in this case.

[~aljoscha] I think we can close this issue, what do you think?


was (Author: twalthr):
As far as I know, mixing of Scala and Java API is not supported. So it is 
possible that the type extraction fails in this example. 

I tested this which runs fine:
{code}
text.map(new MapFunction() {
@Override
public Map map(String value) throws Exception {
  return new HashMap(2);
}
}).print();
{code}

Explicit casts is ok in this case.

[~aljoscha] I think we can close this issue, what do you think?

> .returns() doesn't compile when using .map() with a custom MapFunction
> --
>
> Key: FLINK-3563
> URL: https://issues.apache.org/jira/browse/FLINK-3563
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.10.1
>Reporter: Simone Robutti
>Priority: Minor
>
> Defined a DummyMapFunction that goes from a java Map to another java Map like 
> this:
> {code:title=DummyMapFunction.scalaborderStyle=solid}
> class DummyMapFunction() extends MapFunction[java.util.Map[String, Any], 
> java.util.Map[FieldName, Any]] {
>   override def map(input: java.util.Map[String, Any]): 
> java.util.Map[FieldName, Any] = {
> val result: java.util.Map[FieldName, Any] = new 
> java.util.HashMap[FieldName, Any]()
> result
>   }
> }
> {code}
> and trying to use it with a map:
> {code:title=Main.java}
> DummyMapFunction operator = new DummyMapFunction();
> DataSource> dataset = env.fromCollection(input);
> List> collectedResult = 
> dataset.map(operator).returns(java.util.Map.class).collect();
> {code}
> the returns call doesn't compile because it can't resolve the returns method 
> with the parameter.
> But if insted of creating a variable of type DummyMapFunction I create a
> {code}
> MapFunction operator=new DummyMapFuction();
> {code}
> or I explicitly cast the variable to a MapFunction, it compiles and work 
> flawlessly.
> This is a trick that works but I think is an unexpected behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3563) .returns() doesn't compile when using .map() with a custom MapFunction

2016-03-02 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175641#comment-15175641
 ] 

Timo Walther commented on FLINK-3563:
-

As far as I know, mixing of Scala and Java API is not supported. So it is 
possible that the type extraction fails in this example. 

I tested this which runs fine:
{code}
text.map(new MapFunction() {
@Override
public Map map(String value) throws Exception {
  return new HashMap(2);
}
}).print();
{code}

Explicit casts is ok in this case.

[~aljoscha] I think we can close this issue, what do you think?

> .returns() doesn't compile when using .map() with a custom MapFunction
> --
>
> Key: FLINK-3563
> URL: https://issues.apache.org/jira/browse/FLINK-3563
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.10.1
>Reporter: Simone Robutti
>Priority: Minor
>
> Defined a DummyMapFunction that goes from a java Map to another java Map like 
> this:
> {code:title=DummyMapFunction.scalaborderStyle=solid}
> class DummyMapFunction() extends MapFunction[java.util.Map[String, Any], 
> java.util.Map[FieldName, Any]] {
>   override def map(input: java.util.Map[String, Any]): 
> java.util.Map[FieldName, Any] = {
> val result: java.util.Map[FieldName, Any] = new 
> java.util.HashMap[FieldName, Any]()
> result
>   }
> }
> {code}
> and trying to use it with a map:
> {code:title=Main.java}
> DummyMapFunction operator = new DummyMapFunction();
> DataSource> dataset = env.fromCollection(input);
> List> collectedResult = 
> dataset.map(operator).returns(java.util.Map.class).collect();
> {code}
> the returns call doesn't compile because it can't resolve the returns method 
> with the parameter.
> But if insted of creating a variable of type DummyMapFunction I create a
> {code}
> MapFunction operator=new DummyMapFuction();
> {code}
> or I explicitly cast the variable to a MapFunction, it compiles and work 
> flawlessly.
> This is a trick that works but I think is an unexpected behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11

2016-03-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175616#comment-15175616
 ] 

Aljoscha Krettek commented on FLINK-3565:
-

I think 2) is the way to go, yes.

> FlinkKafkaConsumer does not work with Scala 2.11 
> -
>
> Key: FLINK-3565
> URL: https://issues.apache.org/jira/browse/FLINK-3565
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Maximilian Michels
>Priority: Blocker
>
> Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 
> cluster with this exception:
> java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
>   at kafka.utils.Pool.(Pool.scala:28)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>   at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160)
>   at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.ClassNotFoundException: 
> scala.collection.GenTraversableOnce$class
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   ... 21 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3561) ExecutionConfig's timestampsEnabled is unused

2016-03-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175613#comment-15175613
 ] 

Aljoscha Krettek commented on FLINK-3561:
-

I think so, yes. [~StephanEwen] must have forgot them when he removed the other 
related stuff.

> ExecutionConfig's timestampsEnabled is unused
> -
>
> Key: FLINK-3561
> URL: https://issues.apache.org/jira/browse/FLINK-3561
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.1.0
>
>
> Seems like the flag can be removed. What do you think [~aljoscha]?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11

2016-03-02 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-3565:
-

Assignee: Maximilian Michels

> FlinkKafkaConsumer does not work with Scala 2.11 
> -
>
> Key: FLINK-3565
> URL: https://issues.apache.org/jira/browse/FLINK-3565
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Maximilian Michels
>Priority: Blocker
>
> Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 
> cluster with this exception:
> java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
>   at kafka.utils.Pool.(Pool.scala:28)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
>   at 
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>   at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160)
>   at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: java.lang.ClassNotFoundException: 
> scala.collection.GenTraversableOnce$class
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   ... 21 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3565) FlinkKafkaConsumer does not work with Scala 2.11

2016-03-02 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-3565:
---

 Summary: FlinkKafkaConsumer does not work with Scala 2.11 
 Key: FLINK-3565
 URL: https://issues.apache.org/jira/browse/FLINK-3565
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.0.0
Reporter: Aljoscha Krettek
Priority: Blocker


Running a program built against Flink_2.11 dependencies fails on a Flink_2.11 
cluster with this exception:

java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
at kafka.utils.Pool.(Pool.scala:28)
at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
at 
kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:160)
at com.dataartisans.querywindow.WindowJob.main(WindowJob.java:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.ClassNotFoundException: 
scala.collection.GenTraversableOnce$class
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 21 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3152) Support all comparisons for Date type

2016-03-02 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175590#comment-15175590
 ] 

Dawid Wysakowicz commented on FLINK-3152:
-

Thanks Timo,
Sure, I will happily work on it as well as any other issue in Table API if 
there are some more adequate for a beginner like me or with a higher priority.

> Support all comparisons for Date type
> -
>
> Key: FLINK-3152
> URL: https://issues.apache.org/jira/browse/FLINK-3152
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Timo Walther
>
> Currently, the Table API does not support comparisons like "DATE < DATE", 
> "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3152) Support all comparisons for Date type

2016-03-02 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175570#comment-15175570
 ] 

Timo Walther edited comment on FLINK-3152 at 3/2/16 1:21 PM:
-

Hi Dawid,
thanks for your interest. We are currently moving the Table API on top of 
Apache Calcite (see [FLINK-3221]). The code generation needed to be rewritten 
for that. This issue is now more complicated than it was before (with the "old" 
Table API), because the new CodeGenerator does not support the Date type at all 
yet. We need to sync it with Calcites time/date types first. If you  like to 
work on this issue anyway I can help you integrating the Date type into the new 
Table API on Calcite.

Regards,
Timo


was (Author: twalthr):
Hi Dawid,
thanks for your interest. We are currently moving the Table API on top of 
Apache Calcite (see [FLINK-3221]). The code generation needed to be rewritten 
for that. This issue is now more complicated than it was before (with the "old" 
Table API), because the new CodeGenerator does not support the Date type at 
all. We need to sync it with Calcites time/date types first. If you  like to 
work on this issue anyway I can help you integrating the Date type into the new 
Table API on Calcite.

Regards,
Timo

> Support all comparisons for Date type
> -
>
> Key: FLINK-3152
> URL: https://issues.apache.org/jira/browse/FLINK-3152
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Timo Walther
>
> Currently, the Table API does not support comparisons like "DATE < DATE", 
> "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3152) Support all comparisons for Date type

2016-03-02 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175570#comment-15175570
 ] 

Timo Walther commented on FLINK-3152:
-

Hi Dawid,
thanks for your interest. We are currently moving the Table API on top of 
Apache Calcite (see [FLINK-3221]). The code generation needed to be rewritten 
for that. This issue is now more complicated than it was before (with the "old" 
Table API), because the new CodeGenerator does not support the Date type at 
all. We need to sync it with Calcites time/date types first. If you  like to 
work on this issue anyway I can help you integrating the Date type into the new 
Table API on Calcite.

Regards,
Timo

> Support all comparisons for Date type
> -
>
> Key: FLINK-3152
> URL: https://issues.apache.org/jira/browse/FLINK-3152
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Timo Walther
>
> Currently, the Table API does not support comparisons like "DATE < DATE", 
> "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3152) Support all comparisons for Date type

2016-03-02 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175564#comment-15175564
 ] 

Dawid Wysakowicz commented on FLINK-3152:
-

Hi,
I would happily try to work on this issue if nobody else is already working on 
it.
I would also benefit from any advice on how should I proceed with it.

Thanks

> Support all comparisons for Date type
> -
>
> Key: FLINK-3152
> URL: https://issues.apache.org/jira/browse/FLINK-3152
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Timo Walther
>
> Currently, the Table API does not support comparisons like "DATE < DATE", 
> "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3564] [table] Implement distinct() for ...

2016-03-02 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1754

[FLINK-3564] [table] Implement distinct() for Table API

Implements some syntactic sugar. Another step towards a SQL-like API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink distinctTableApi

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1754.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1754


commit 5986e9e284bf64dbc5a382c829b8a5843929b028
Author: twalthr 
Date:   2016-03-02T12:19:38Z

[FLINK-3564] [table] Implement distinct() for Table API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3564) Implement distinct() for Table API

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175522#comment-15175522
 ] 

ASF GitHub Bot commented on FLINK-3564:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1754

[FLINK-3564] [table] Implement distinct() for Table API

Implements some syntactic sugar. Another step towards a SQL-like API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink distinctTableApi

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1754.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1754


commit 5986e9e284bf64dbc5a382c829b8a5843929b028
Author: twalthr 
Date:   2016-03-02T12:19:38Z

[FLINK-3564] [table] Implement distinct() for Table API




> Implement distinct() for Table API
> --
>
> Key: FLINK-3564
> URL: https://issues.apache.org/jira/browse/FLINK-3564
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Minor
>
> This is only syntactic sugar for grouping of all fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3564) Implement distinct() for Table API

2016-03-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-3564:
---

 Summary: Implement distinct() for Table API
 Key: FLINK-3564
 URL: https://issues.apache.org/jira/browse/FLINK-3564
 Project: Flink
  Issue Type: New Feature
  Components: Table API
Reporter: Timo Walther
Assignee: Timo Walther
Priority: Minor


This is only syntactic sugar for grouping of all fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2941) Implement a neo4j - Flink/Gelly connector

2016-03-02 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175453#comment-15175453
 ] 

Chesnay Schepler commented on FLINK-2941:
-

does neo4j have something akin to our flink-contrib module? adding a connector 
without tests is bound to cause issues in the future.

> Implement a neo4j - Flink/Gelly connector
> -
>
> Key: FLINK-2941
> URL: https://issues.apache.org/jira/browse/FLINK-2941
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Martin Junghanns
>  Labels: requires-design-doc
>
> By connecting Flink/Gelly with a graph database like neo4j we can facilitate 
> interesting use-cases, like:
> - use neo4j as an input source, i.e. read the complete graph or a subgraph 
> from a neo4j database, import it into Flink and run a graph analysis task 
> with Gelly.
> - use neo4j as a sink, i.e. perform ETL on some data in Flink to create a 
> graph and insert the graph in a neo4j database for further querying.
> We have started a discussion on possible implementations and have looked into 
> similar projects, e.g. connecting neo4j to Spark. Some initial thoughts and 
> experiences can be found in [this 
> document|https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing].
>  Please, feel free to comment and add ideas! I will also start a discussion 
> in the mailing list with more concrete problems that we would like to get 
> feedback on.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3563) .returns() doesn't compile when using .map() with a custom MapFunction

2016-03-02 Thread Simone Robutti (JIRA)
Simone Robutti created FLINK-3563:
-

 Summary: .returns() doesn't compile when using .map() with a 
custom MapFunction
 Key: FLINK-3563
 URL: https://issues.apache.org/jira/browse/FLINK-3563
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Affects Versions: 0.10.1
Reporter: Simone Robutti
Priority: Minor


Defined a DummyMapFunction that goes from a java Map to another java Map like 
this:

{code:title=DummyMapFunction.scalaborderStyle=solid}
class DummyMapFunction() extends MapFunction[java.util.Map[String, Any], 
java.util.Map[FieldName, Any]] {
  override def map(input: java.util.Map[String, Any]): java.util.Map[FieldName, 
Any] = {
val result: java.util.Map[FieldName, Any] = new 
java.util.HashMap[FieldName, Any]()
result
  }
}
{code}

and trying to use it with a map:

{code:title=Main.java}
DummyMapFunction operator = new DummyMapFunction();

DataSource> dataset = env.fromCollection(input);
List> collectedResult = 
dataset.map(operator).returns(java.util.Map.class).collect();
{code}

the returns call doesn't compile because it can't resolve the returns method 
with the parameter.

But if insted of creating a variable of type DummyMapFunction I create a

{code}
MapFunction operator=new DummyMapFuction();
{code}

or I explicitly cast the variable to a MapFunction, it compiles and work 
flawlessly.

This is a trick that works but I think is an unexpected behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2941) Implement a neo4j - Flink/Gelly connector

2016-03-02 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175434#comment-15175434
 ] 

Vasia Kalavri commented on FLINK-2941:
--

Hey [~mju],
thanks for working on this! I'm very happy we have working input and output 
formats for Neo4j :D
Regarding the licenses, afaik we cannot have a GPL dependency in Flink. So, I 
see two solutions here:
(1) we add the Neo4j connector to flink-batch-connectors without tests. 
Examples don't depend on anything GPL licensed, right?
(2) we keep this as an external flink package.

I'm personally in favor of (1), but I would like to hear other people's 
opinions. It seems that there are other batch connectors without tests btw, 
e.g. hcatalog and hbase.

> Implement a neo4j - Flink/Gelly connector
> -
>
> Key: FLINK-2941
> URL: https://issues.apache.org/jira/browse/FLINK-2941
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Martin Junghanns
>  Labels: requires-design-doc
>
> By connecting Flink/Gelly with a graph database like neo4j we can facilitate 
> interesting use-cases, like:
> - use neo4j as an input source, i.e. read the complete graph or a subgraph 
> from a neo4j database, import it into Flink and run a graph analysis task 
> with Gelly.
> - use neo4j as a sink, i.e. perform ETL on some data in Flink to create a 
> graph and insert the graph in a neo4j database for further querying.
> We have started a discussion on possible implementations and have looked into 
> similar projects, e.g. connecting neo4j to Spark. Some initial thoughts and 
> experiences can be found in [this 
> document|https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing].
>  Please, feel free to comment and add ideas! I will also start a discussion 
> in the mailing list with more concrete problems that we would like to get 
> feedback on.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175430#comment-15175430
 ] 

ASF GitHub Bot commented on FLINK-3474:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/1746#issuecomment-191186632
  
Hi, @fhueske , i've update the PR based on your comments, except object 
reusing in  `AggregateGroupCombineFunction`. I found `GroupReduceOperator` 
would transform it into `CombineToGroupCombineWrapper` which does not implement 
a rich user function. so we can not initiate the reusing object in `open()`. I 
think we need add a `RichCombineToGroupCombineWrapper` which used to wrap rich 
combinable GroupReduceFunction like 'AggregateGroupCombineFunction'. what do 
you think?


> Partial aggregate interface design and sort-based implementation
> 
>
> Key: FLINK-3474
> URL: https://issues.apache.org/jira/browse/FLINK-3474
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node to 
> Flink user functions. As hash-based combiner is not available yet(see PR 
> #1517), we would use sort-based combine as default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3474] support partial aggregate

2016-03-02 Thread ChengXiangLi
Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/1746#issuecomment-191186632
  
Hi, @fhueske , i've update the PR based on your comments, except object 
reusing in  `AggregateGroupCombineFunction`. I found `GroupReduceOperator` 
would transform it into `CombineToGroupCombineWrapper` which does not implement 
a rich user function. so we can not initiate the reusing object in `open()`. I 
think we need add a `RichCombineToGroupCombineWrapper` which used to wrap rich 
combinable GroupReduceFunction like 'AggregateGroupCombineFunction'. what do 
you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3474) Partial aggregate interface design and sort-based implementation

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175412#comment-15175412
 ] 

ASF GitHub Bot commented on FLINK-3474:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54706319
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
 ---
@@ -17,26 +17,77 @@
  */
 package org.apache.flink.api.table.runtime.aggregate
 
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.table.Row
+
 /**
- * Represents a SQL aggregate function. The user should first initialize 
the aggregate, then feed it
- * with grouped aggregate field values, and finally get the aggregated 
value.
- * @tparam T the output type
+ * The interface for all Flink aggregate functions, which expressed in 
terms of initiate(),
+ * prepare(), merge() and evaluate(). The aggregate functions would be 
executed in 2 phases:
+ * -- In Map phase, use prepare() to transform aggregate field value into 
intermediate
+ * aggregate value.
+ * -- In GroupReduce phase, use merge() to merge grouped intermediate 
aggregate values
+ * into aggregate buffer. Then use evaluate() to calculate the final 
aggregated value.
+ * For associative decomposable aggregate functions, they support partial 
aggregate. To optimize
+ * the performance, a Combine phase would be added between Map phase and 
GroupReduce phase,
+ * -- In Combine phase, use merge() to merge sub-grouped intermediate 
aggregate values
+ * into aggregate buffer.
+ *
+ * The intermediate aggregate value is stored inside Row, aggOffsetInRow 
is used as the start
+ * field index in Row, so different aggregate functions could share the 
same Row as intermediate
+ * aggregate value/aggregate buffer, as their aggregate values could be 
stored in distinct fields
+ * of Row with no conflict. The intermediate aggregate value is required 
to be a sequence of JVM
+ * primitives, and Flink use intermediateDataType() to get its data types 
in SQL side.
+ *
+ * @tparam T Aggregated value type.
  */
 trait Aggregate[T] extends Serializable {
+
+  protected var aggOffsetInRow: Int = _
+
   /**
-   * Initialize the aggregate state.
+   * Initiate the intermediate aggregate value in Row.
+   * @param intermediate
*/
-  def initiateAggregate
+  def initiate(intermediate: Row): Unit
 
   /**
-   * Feed the aggregate field value.
+   * Transform the aggregate field value into intermediate aggregate data.
* @param value
+   * @param intermediate
*/
-  def aggregate(value: Any)
+  def prepare(value: Any, intermediate: Row): Unit
--- End diff --

Aggregate functions may handle `null` in different ways, some ignore it, 
some take it as specified value, i'm not sure whether it's a good idea to 
handle it with `initiate()` in all cases.


> Partial aggregate interface design and sort-based implementation
> 
>
> Key: FLINK-3474
> URL: https://issues.apache.org/jira/browse/FLINK-3474
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>
> The scope of this sub task includes:
> # Partial aggregate interface.
> # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
> # DataSetAggregateRule which translate logical calcite aggregate node to 
> Flink user functions. As hash-based combiner is not available yet(see PR 
> #1517), we would use sort-based combine as default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3474] support partial aggregate

2016-03-02 Thread ChengXiangLi
Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1746#discussion_r54706319
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
 ---
@@ -17,26 +17,77 @@
  */
 package org.apache.flink.api.table.runtime.aggregate
 
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.table.Row
+
 /**
- * Represents a SQL aggregate function. The user should first initialize 
the aggregate, then feed it
- * with grouped aggregate field values, and finally get the aggregated 
value.
- * @tparam T the output type
+ * The interface for all Flink aggregate functions, which expressed in 
terms of initiate(),
+ * prepare(), merge() and evaluate(). The aggregate functions would be 
executed in 2 phases:
+ * -- In Map phase, use prepare() to transform aggregate field value into 
intermediate
+ * aggregate value.
+ * -- In GroupReduce phase, use merge() to merge grouped intermediate 
aggregate values
+ * into aggregate buffer. Then use evaluate() to calculate the final 
aggregated value.
+ * For associative decomposable aggregate functions, they support partial 
aggregate. To optimize
+ * the performance, a Combine phase would be added between Map phase and 
GroupReduce phase,
+ * -- In Combine phase, use merge() to merge sub-grouped intermediate 
aggregate values
+ * into aggregate buffer.
+ *
+ * The intermediate aggregate value is stored inside Row, aggOffsetInRow 
is used as the start
+ * field index in Row, so different aggregate functions could share the 
same Row as intermediate
+ * aggregate value/aggregate buffer, as their aggregate values could be 
stored in distinct fields
+ * of Row with no conflict. The intermediate aggregate value is required 
to be a sequence of JVM
+ * primitives, and Flink use intermediateDataType() to get its data types 
in SQL side.
+ *
+ * @tparam T Aggregated value type.
  */
 trait Aggregate[T] extends Serializable {
+
+  protected var aggOffsetInRow: Int = _
+
   /**
-   * Initialize the aggregate state.
+   * Initiate the intermediate aggregate value in Row.
+   * @param intermediate
*/
-  def initiateAggregate
+  def initiate(intermediate: Row): Unit
 
   /**
-   * Feed the aggregate field value.
+   * Transform the aggregate field value into intermediate aggregate data.
* @param value
+   * @param intermediate
*/
-  def aggregate(value: Any)
+  def prepare(value: Any, intermediate: Row): Unit
--- End diff --

Aggregate functions may handle `null` in different ways, some ignore it, 
some take it as specified value, i'm not sure whether it's a good idea to 
handle it with `initiate()` in all cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [build] Consolidate scala checkstyle usage and...

2016-03-02 Thread mbalassi
GitHub user mbalassi opened a pull request:

https://github.com/apache/flink/pull/1753

[build] Consolidate scala checkstyle usage and update version to 0.8.0

Consolidation is done via plugin management, so that configuration is done 
in one central location.

Version update is needed because the latest releaseof the plugin, 0.8.0 
contains a fix for a bug that I have recently run into. [1]

[1] https://github.com/scalastyle/scalastyle/pull/124

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mbalassi/flink scalastyle-bump

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1753.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1753


commit 2abfb63bbab0e511bcdd680593b310ccc9f98da0
Author: Márton Balassi 
Date:   2016-03-01T20:42:40Z

[build] Consolidate scala checkstyle usage and update version to 0.8.0

Consolidation is done via plugin management.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3562) Update docs in the course of EventTimeSourceFunction removal

2016-03-02 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-3562:
-

 Summary: Update docs in the course of EventTimeSourceFunction 
removal
 Key: FLINK-3562
 URL: https://issues.apache.org/jira/browse/FLINK-3562
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
Priority: Minor
 Fix For: 1.0.0, 1.1.0


EventTimeSourceFunction has been removed. Documentation and JavaDocs haven't 
been updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3560] [examples] Remove unchecked outpu...

2016-03-02 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1752#issuecomment-191171356
  
Added the missing `ProgramDescriptions`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3560) Examples shouldn't always print usage statement

2016-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175353#comment-15175353
 ] 

ASF GitHub Bot commented on FLINK-3560:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1752#issuecomment-191159732
  
That is a good point @vasia. Will add the missing `ProgramDescriptions`.


> Examples shouldn't always print usage statement
> ---
>
> Key: FLINK-3560
> URL: https://issues.apache.org/jira/browse/FLINK-3560
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> At the moment all our examples print a usage statement no matter whether the 
> parameters have been provided or not. This can be confusing for people 
> because an usage statement is usually only printed if one has specified a 
> wrong parameter or if a parameter is missing.
> I propose to remove the unchecked printing of the usage statement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)