[jira] [Commented] (FLINK-3792) RowTypeInfo equality should not depend on field names

2016-06-29 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356552#comment-15356552
 ] 

Jark Wu commented on FLINK-3792:


Do we need this before 1.1.0 release ?  If yes, I can take up this issue.  

We can modify {{RowTypeInfo}} constructor to take only one fieldTypes 
parameter, and create default names like ("f0", "f1", ...).  Because for Row we 
only need to check types, we don't need to pass the fieldNames parameter to 
RowTypeInfo. 

> RowTypeInfo equality should not depend on field names
> -
>
> Key: FLINK-3792
> URL: https://issues.apache.org/jira/browse/FLINK-3792
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>
> Currently, two Rows with the same field types but different field names are 
> not considered equal by the Table API and SQL. This behavior might create 
> problems, e.g. it makes the following union query fail:
> {code}
> SELECT STREAM a, b, c FROM T1 UNION ALL 
> (SELECT STREAM d, e, f FROM T2 WHERE d < 3)
> {code}
> where a, b, c and d, e, f are fields of corresponding types.
> {code}
> Cannot union streams of different types: org.apache.flink.api.table.Row(a: 
> Integer, b: Long, c: String) and org.apache.flink.api.table.Row(d: Integer, 
> e: Long, f: String)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4130) CallGenerator could generate illegal code when taking no operands

2016-06-29 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356540#comment-15356540
 ] 

ramkrishna.s.vasudevan commented on FLINK-4130:
---

Oh I see. No problem.

> CallGenerator could generate illegal code when taking no operands
> -
>
> Key: FLINK-4130
> URL: https://issues.apache.org/jira/browse/FLINK-4130
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Cody
>Priority: Minor
>
> In CallGenerator, when a call takes no operands, and null check is enabled, 
> it will generate code like:
> boolean isNull$17 = ;
> which will fail to compile at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4130) CallGenerator could generate illegal code when taking no operands

2016-06-29 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356537#comment-15356537
 ] 

Jark Wu commented on FLINK-4130:


It seems that cody has pulled a request for this issue:  
https://github.com/apache/flink/pull/2182

> CallGenerator could generate illegal code when taking no operands
> -
>
> Key: FLINK-4130
> URL: https://issues.apache.org/jira/browse/FLINK-4130
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Cody
>Priority: Minor
>
> In CallGenerator, when a call takes no operands, and null check is enabled, 
> it will generate code like:
> boolean isNull$17 = ;
> which will fail to compile at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4130) CallGenerator could generate illegal code when taking no operands

2016-06-29 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356535#comment-15356535
 ] 

ramkrishna.s.vasudevan commented on FLINK-4130:
---

I can take this up if the fix is not super urgent.

> CallGenerator could generate illegal code when taking no operands
> -
>
> Key: FLINK-4130
> URL: https://issues.apache.org/jira/browse/FLINK-4130
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Cody
>Priority: Minor
>
> In CallGenerator, when a call takes no operands, and null check is enabled, 
> it will generate code like:
> boolean isNull$17 = ;
> which will fail to compile at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3674) Add an interface for EventTime aware User Function

2016-06-29 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356513#comment-15356513
 ] 

ramkrishna.s.vasudevan commented on FLINK-3674:
---

[~aljoscha]
Thanks for the feedback. Let me have a look at the Timers in WindowOperator to 
see how things can be implemented and get back here for queries/clarifications.

> Add an interface for EventTime aware User Function
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2182: [Flink-4130] CallGenerator could generate illegal code wh...

2016-06-29 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2182
  
Yah...   the `isNull$17` is not declared if we do what I said above, and of 
course will throw compile error. 

The code looks good to me now, although it looks a little weird that 
`$nullTerm` always be `false`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356385#comment-15356385
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69065352
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -236,6 +236,32 @@ case class Aggregate(
   }
 }
 
+case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) 
extends BinaryNode {
+  override def output: Seq[Attribute] = left.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+left.construct(relBuilder)
+right.construct(relBuilder)
+relBuilder.minus(all)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+val resolvedMinus = super.validate(tableEnv).asInstanceOf[SetMinus]
+if (left.output.length != right.output.length) {
+  failValidation(s"Set minus two table of different column sizes:" +
+s" ${left.output.size} and ${right.output.size}")
+}
+val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
+  l.resultType == r.resultType && l.name == r.name }
--- End diff --

Yes, I refer to the last case. I agree with @fhueske 's opinion, we can 
remove the check of field names in `EXCEPT` and `INTERSECT` now, and remove the 
restriction in `UNION` in the future. 


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69065352
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -236,6 +236,32 @@ case class Aggregate(
   }
 }
 
+case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) 
extends BinaryNode {
+  override def output: Seq[Attribute] = left.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+left.construct(relBuilder)
+right.construct(relBuilder)
+relBuilder.minus(all)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+val resolvedMinus = super.validate(tableEnv).asInstanceOf[SetMinus]
+if (left.output.length != right.output.length) {
+  failValidation(s"Set minus two table of different column sizes:" +
+s" ${left.output.size} and ${right.output.size}")
+}
+val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
+  l.resultType == r.resultType && l.name == r.name }
--- End diff --

Yes, I refer to the last case. I agree with @fhueske 's opinion, we can 
remove the check of field names in `EXCEPT` and `INTERSECT` now, and remove the 
restriction in `UNION` in the future. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2179: [FLINK-4128] compile error about git-commit-id-plugin

2016-06-29 Thread gallenvara
Github user gallenvara commented on the issue:

https://github.com/apache/flink/pull/2179
  
This problem appears occasionally when i build source code, but not always. 
The modified line corrects the right directory of `.git`. In the case where 
maven using the plugin, this fix is neccessary, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4128) compile error about git-commit-id-plugin

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356383#comment-15356383
 ] 

ASF GitHub Bot commented on FLINK-4128:
---

Github user gallenvara commented on the issue:

https://github.com/apache/flink/pull/2179
  
This problem appears occasionally when i build source code, but not always. 
The modified line corrects the right directory of `.git`. In the case where 
maven using the plugin, this fix is neccessary, IMO.


> compile error about git-commit-id-plugin
> 
>
> Key: FLINK-4128
> URL: https://issues.apache.org/jira/browse/FLINK-4128
> Project: Flink
>  Issue Type: Bug
>Reporter: Mao, Wei
>
> When I build with latest flink code, I got following error:
> {quote}
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 01:06 h
> [INFO] Finished at: 2016-06-28T22:11:58+08:00
> [INFO] Final Memory: 104M/3186M
> [INFO] 
> 
> [ERROR] Failed to execute goal 
> pl.project13.maven:git-commit-id-plugin:2.1.5:revision (default) on project 
> flink-runtime_2.11: Execution default of goal 
> pl.project13.maven:git-commit-id-plugin:2.1.5:revision failed. 
> NullPointerException -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions, please 
> read the following articles:
> [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the 
> command
> [ERROR]   mvn  -rf :flink-runtime_2.11
> {quote}
> I think it's because wrong `doGetDirectory` value is provided.
> And another question is if we should upgrade the version of this plugin, so 
> that we can got more meaningful error message instead of NPE. Eg:
> {quote}
> Could not get HEAD Ref, are you sure you have some commits in the 
> dotGitDirectory?
> {quote}
> Current stable version is 2.2.1, but the disadvantage is that Java 1.6 is no 
> longer supported with new version.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2182: [Flink-4130] CallGenerator could generate illegal code wh...

2016-06-29 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/flink/pull/2182
  
@wuchong although I agree this bug could be fixed by adding 
`operands.nonEmpty`, there is a problem in generated code, specifically, an 
undeclared variable will be generated like this:
```
long result$16 = 
((org.apache.flink.api.java.table.TumblingWindowUDf)
udfDict.get("tumblingWin")).
  eval()
   ;

if (isNull$17) {
  out.setField(1, null);
}
else {
  out.setField(1, result$16);
}
```
I guess it's generated in `GeneratedExpression`, for simplicity I reverted 
my change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356225#comment-15356225
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@rmetzger What about this failing tests... 
```
JMXReporterTest.testJMXAvailability:148 » Runtime Could not start JMX 
server o...
```
Seems, there is no JIRA -- known issue? -- or no issue?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1813: [FLINK-3034] Redis Sink Connector

2016-06-29 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@rmetzger What about this failing tests... 
```
JMXReporterTest.testJMXAvailability:148 » Runtime Could not start JMX 
server o...
```
Seems, there is no JIRA -- known issue? -- or no issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356209#comment-15356209
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2169
  
Hi @mushketyk, thanks for the PR! I added a few comments inline.

Best, Fabian
@wuchong, thanks for reviewing!


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2169
  
Hi @mushketyk, thanks for the PR! I added a few comments inline.

Best, Fabian
@wuchong, thanks for reviewing!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048813
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala
 ---
@@ -139,4 +154,105 @@ class UnionITCase(
 // Must fail. Tables are bound to different TableEnvironments.
 ds1.unionAll(ds2).select('c)
   }
+
+  @Test
+  def testSetMinusAll(): Unit = {
+val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = 
CollectionDataSets.getOneElement3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+val minusDs = ds1.minusAll(ds2).select('c)
+
+val results = minusDs.toDataSet[Row].collect()
+val expected = "Hello\n" + "Hello world\n"
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSetMinusAllWithDuplicates(): Unit = {
+val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds3 = 
CollectionDataSets.getOneElement3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+val minusDs = ds1.unionAll(ds2).minusAll(ds3).select('c)
+
+val results = minusDs.toDataSet[Row].collect()
+val expected = "Hello\n" + "Hello world\n" +
+  "Hello\n" + "Hello world\n"
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSetMinus(): Unit = {
--- End diff --

Can you combine this and the next test by using test data that covers both 
cases for different records, i.e., have some records with duplicates in this 
first, second, none, and both data sets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356206#comment-15356206
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048813
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala
 ---
@@ -139,4 +154,105 @@ class UnionITCase(
 // Must fail. Tables are bound to different TableEnvironments.
 ds1.unionAll(ds2).select('c)
   }
+
+  @Test
+  def testSetMinusAll(): Unit = {
+val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = 
CollectionDataSets.getOneElement3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+val minusDs = ds1.minusAll(ds2).select('c)
+
+val results = minusDs.toDataSet[Row].collect()
+val expected = "Hello\n" + "Hello world\n"
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSetMinusAllWithDuplicates(): Unit = {
+val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds3 = 
CollectionDataSets.getOneElement3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+val minusDs = ds1.unionAll(ds2).minusAll(ds3).select('c)
+
+val results = minusDs.toDataSet[Row].collect()
+val expected = "Hello\n" + "Hello world\n" +
+  "Hello\n" + "Hello world\n"
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSetMinus(): Unit = {
--- End diff --

Can you combine this and the next test by using test data that covers both 
cases for different records, i.e., have some records with duplicates in this 
first, second, none, and both data sets.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048653
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala
 ---
@@ -61,14 +62,28 @@ class UnionITCase(
 val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
 val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
 
-val unionDs = ds1.union(ds2).select('c)
+val minusDs = ds1.union(ds2).select('c)
 
-val results = unionDs.toDataSet[Row].collect()
+val results = minusDs.toDataSet[Row].collect()
 val expected = "Hi\n" + "Hello\n" + "Hello world\n"
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
+  def testSetMinusAllSameSets(): Unit = {
--- End diff --

Please remove this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356202#comment-15356202
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048653
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala
 ---
@@ -61,14 +62,28 @@ class UnionITCase(
 val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
 val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
 
-val unionDs = ds1.union(ds2).select('c)
+val minusDs = ds1.union(ds2).select('c)
 
-val results = unionDs.toDataSet[Row].collect()
+val results = minusDs.toDataSet[Row].collect()
 val expected = "Hi\n" + "Hello\n" + "Hello world\n"
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
+  def testSetMinusAllSameSets(): Unit = {
--- End diff --

Please remove this test.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356199#comment-15356199
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048442
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala
 ---
@@ -61,14 +62,28 @@ class UnionITCase(
 val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
 val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
 
-val unionDs = ds1.union(ds2).select('c)
+val minusDs = ds1.union(ds2).select('c)
--- End diff --

Why this change?


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356197#comment-15356197
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048368
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperationsITCase.scala
 ---
@@ -121,4 +121,66 @@ class UnionITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testExcept(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+val ds2 = CollectionDataSets.getOneElement3TupleDataSet(env)
+tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+val result = tEnv.sql(sqlQuery)
+
+val expected = "Hello\n" + "Hello world\n"
+val results = result.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testExceptWithFilter(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery = "SELECT c FROM (" +
+  "SELECT * FROM t1 EXCEPT (SELECT a, b, c FROM t2))" +
+  "WHERE b < 2"
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+val ds2 = CollectionDataSets.get5TupleDataSet(env)
+tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+val result = tEnv.sql(sqlQuery)
+
+val expected = "Hi\n"
+val results = result.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testExceptWithAggregation(): Unit = {
--- End diff --

Please remove this test.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356196#comment-15356196
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048325
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperationsITCase.scala
 ---
@@ -121,4 +121,66 @@ class UnionITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testExcept(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+val ds2 = CollectionDataSets.getOneElement3TupleDataSet(env)
+tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+val result = tEnv.sql(sqlQuery)
+
+val expected = "Hello\n" + "Hello world\n"
+val results = result.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testExceptWithFilter(): Unit = {
--- End diff --

Please remove this test. We need to be very careful about the build time of 
the project (we are very close to hit the 2h build timeout of Travis) and 
integration tests are quite time consuming. So we try to avoid integration 
tests that do not add to the test coverage. 


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048442
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala
 ---
@@ -61,14 +62,28 @@ class UnionITCase(
 val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
 val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 
'a, 'b, 'c)
 
-val unionDs = ds1.union(ds2).select('c)
+val minusDs = ds1.union(ds2).select('c)
--- End diff --

Why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048368
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperationsITCase.scala
 ---
@@ -121,4 +121,66 @@ class UnionITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testExcept(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+val ds2 = CollectionDataSets.getOneElement3TupleDataSet(env)
+tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+val result = tEnv.sql(sqlQuery)
+
+val expected = "Hello\n" + "Hello world\n"
+val results = result.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testExceptWithFilter(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery = "SELECT c FROM (" +
+  "SELECT * FROM t1 EXCEPT (SELECT a, b, c FROM t2))" +
+  "WHERE b < 2"
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+val ds2 = CollectionDataSets.get5TupleDataSet(env)
+tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+val result = tEnv.sql(sqlQuery)
+
+val expected = "Hi\n"
+val results = result.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testExceptWithAggregation(): Unit = {
--- End diff --

Please remove this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048325
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperationsITCase.scala
 ---
@@ -121,4 +121,66 @@ class UnionITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testExcept(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
+
+val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+val ds2 = CollectionDataSets.getOneElement3TupleDataSet(env)
+tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+val result = tEnv.sql(sqlQuery)
+
+val expected = "Hello\n" + "Hello world\n"
+val results = result.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testExceptWithFilter(): Unit = {
--- End diff --

Please remove this test. We need to be very careful about the build time of 
the project (we are very close to hit the 2h build timeout of Travis) and 
integration tests are quite time consuming. So we try to avoid integration 
tests that do not add to the test coverage. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356190#comment-15356190
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048019
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1353,6 +1353,16 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 getCallLocationName()))
 
   // 

+  //  Minus
+  // 

+
+  /**
+* Creates a new DataSet containing the elements from `this` DataSet 
minus elements from `other`
+* DataSet.
+*/
+  def minus(other: DataSet[T]): DataSet[T] = 
wrap(javaSet.minus(other.javaSet))
--- End diff --

Please remove these changes as well. Thanks


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69048019
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1353,6 +1353,16 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 getCallLocationName()))
 
   // 

+  //  Minus
+  // 

+
+  /**
+* Creates a new DataSet containing the elements from `this` DataSet 
minus elements from `other`
+* DataSet.
+*/
+  def minus(other: DataSet[T]): DataSet[T] = 
wrap(javaSet.minus(other.javaSet))
--- End diff --

Please remove these changes as well. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356188#comment-15356188
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69047875
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -426,6 +426,49 @@ class Table(
   }
 
   /**
+* Set minus between two [[Table]]s. Similar to an SQL EXCEPT ALL.
+* The fields of the two minus operands must fully overlap.
+*
+* Note: Both tables must be bound to the same [[TableEnvironment]].
+*
+* Example:
+*
+* {{{
+*   left.minusAll(right)
+* }}}
+*/
+  def minusAll(right: Table): Table = {
+// check that right table belongs to the same TableEnvironment
+if (right.tableEnv != this.tableEnv) {
+  throw new ValidationException("Only tables from the same 
TableEnvironment can be unioned.")
+}
+new Table(tableEnv, SetMinus(logicalPlan, right.logicalPlan, 
true).validate(tableEnv))
+  }
+
+  /**
+* Perform set minus between [[Table]]s with duplicate records removed.
--- End diff --

Please describe the semantics of minus all in more detail:
- records of the first table are returned
- how many identical records are returned


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69047875
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -426,6 +426,49 @@ class Table(
   }
 
   /**
+* Set minus between two [[Table]]s. Similar to an SQL EXCEPT ALL.
+* The fields of the two minus operands must fully overlap.
+*
+* Note: Both tables must be bound to the same [[TableEnvironment]].
+*
+* Example:
+*
+* {{{
+*   left.minusAll(right)
+* }}}
+*/
+  def minusAll(right: Table): Table = {
+// check that right table belongs to the same TableEnvironment
+if (right.tableEnv != this.tableEnv) {
+  throw new ValidationException("Only tables from the same 
TableEnvironment can be unioned.")
+}
+new Table(tableEnv, SetMinus(logicalPlan, right.logicalPlan, 
true).validate(tableEnv))
+  }
+
+  /**
+* Perform set minus between [[Table]]s with duplicate records removed.
--- End diff --

Please describe the semantics of minus all in more detail:
- records of the first table are returned
- how many identical records are returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356182#comment-15356182
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69047691
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -426,6 +426,49 @@ class Table(
   }
 
   /**
+* Set minus between two [[Table]]s. Similar to an SQL EXCEPT ALL.
--- End diff --

Please describe the semantics of minus all in more detail.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69047691
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -426,6 +426,49 @@ class Table(
   }
 
   /**
+* Set minus between two [[Table]]s. Similar to an SQL EXCEPT ALL.
--- End diff --

Please describe the semantics of minus all in more detail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356180#comment-15356180
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69047560
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalMinus
+import org.apache.calcite.rel.rules.UnionToDistinctRule
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetMinus}
+
+class DataSetMinusRule
+  extends ConverterRule(
+classOf[LogicalMinus],
+Convention.NONE,
+DataSetConvention.INSTANCE,
+"DataSetMinusRule")
+{
+
+  /**
+* Translate EXCEPT and EXCEPT ALL.
+*/
+  override def matches(call: RelOptRuleCall): Boolean = {
--- End diff --

This method can be removed. It always returns `true` which is the default 
implementation which is overwritten here.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69047560
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalMinus
+import org.apache.calcite.rel.rules.UnionToDistinctRule
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetMinus}
+
+class DataSetMinusRule
+  extends ConverterRule(
+classOf[LogicalMinus],
+Convention.NONE,
+DataSetConvention.INSTANCE,
+"DataSetMinusRule")
+{
+
+  /**
+* Translate EXCEPT and EXCEPT ALL.
+*/
+  override def matches(call: RelOptRuleCall): Boolean = {
--- End diff --

This method can be removed. It always returns `true` which is the default 
implementation which is overwritten here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69047378
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
+  *
+  */
+class DataSetMinus(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+rowType: RelDataType,
+all: Boolean)
+  extends BiRel(cluster, traitSet, left, right)
+with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetMinus(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  all
+)
+  }
+
+  override def toString: String = {
+s"SetMinus(setMinus: 
(${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+
+val children = this.getInputs
+val rowCnt = children.foldLeft(0D) { (rows, child) =>
+  rows + metadata.getRowCount(child)
+}
+
+planner.getCostFactory.makeCost(rowCnt, 0, 0)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+var leftDataSet: DataSet[Any] = null
+var rightDataSet: DataSet[Any] = null
+
+expectedType match {
+  case None =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+rightDataSet =
+  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
+  case _ =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+}
+
+val minusRes = leftDataSet.minus(rightDataSet)
+if (!all) {
+  minusRes.distinct()
--- End diff --

I would like to move the code of `DataSet.minus()` here. However, I think 
the semantics of `EXCEPT ALL` are a bit different than in your implementation. 
It is not simply checking if there is a match in the second input and 
forwarding everything if there is none. It basically removes for each match in 
the second input one matching record from the first input (see also the 
[PostgreSQL 
docs](https://www.postgresql.org/docs/9.4/static/sql-select.html#SQL-EXCEPT)).

I would be in favor of an implementation that is similar to @wuchong's 
implementation of `INTERSECT` / `INTERSECT ALL` in PR #2159.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69046225
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -1193,21 +1196,46 @@ public long count() throws Exception {
operation.setInput(this);
return operation.createResult();
}
-   
+
// 

//  Union
// 

 
/**
 * Creates a union of this DataSet with an other DataSet. The other 
DataSet must be of the same data type.
-* 
+*
 * @param other The other DataSet which is unioned with the current 
DataSet.
 * @return The resulting DataSet.
 */
public UnionOperator union(DataSet other){
return new UnionOperator<>(this, other, 
Utils.getCallLocationName());
}
 
+   /**
+   * Creates a set minus of this DataSet with an other DataSet. The other 
DataSet must be of the same data type.
+   *
+   * @param other The other DataSet which is set minus with the current 
DataSet.
+   * @return The resulting DataSet.
+   */
+   public CoGroupOperator minus(DataSet other){
+   return coGroup(other)
+   .where("*")
+   .equalTo("*")
+   .with(new RichCoGroupFunction() {
--- End diff --

A non-rich `CoGroupFunction` is sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355986#comment-15355986
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69045985
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -236,6 +236,32 @@ case class Aggregate(
   }
 }
 
+case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) 
extends BinaryNode {
+  override def output: Seq[Attribute] = left.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+left.construct(relBuilder)
+right.construct(relBuilder)
+relBuilder.minus(all)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+val resolvedMinus = super.validate(tableEnv).asInstanceOf[SetMinus]
+if (left.output.length != right.output.length) {
+  failValidation(s"Set minus two table of different column sizes:" +
+s" ${left.output.size} and ${right.output.size}")
+}
+val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
+  l.resultType == r.resultType && l.name == r.name }
--- End diff --

I think @wuchong refers exactly to the last case. If number of fields and 
field types are identical, it should be possible to do a `EXCEPT` even if the 
field names are not the same.
The situation for `UNION` is a bit special due to Flink's internals and the 
way the `RowTypeInfo` is implemented but I think we can remove that restriction 
in the future. 

So, we must keep the checks for number of fields and field types but can 
remove the check of the field names, IMO.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355947#comment-15355947
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69044111
  
--- Diff: docs/apis/table.md ---
@@ -536,6 +536,29 @@ Table result = left.unionAll(right);
 
 
 
+  Minus
+  
+Similar to a SQL EXCEPT clause. Returns elements from the first 
table that do not exist in the second table. Both tables must have identical 
schema, i.e., field names and types.
--- End diff --

Please rephrase to "Except returns records from the first table that do not 
exist in the second table. Records that exist multiple times in the first table 
are returned exactly once, i.e., duplicates are removed.".


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69045985
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -236,6 +236,32 @@ case class Aggregate(
   }
 }
 
+case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) 
extends BinaryNode {
+  override def output: Seq[Attribute] = left.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+left.construct(relBuilder)
+right.construct(relBuilder)
+relBuilder.minus(all)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+val resolvedMinus = super.validate(tableEnv).asInstanceOf[SetMinus]
+if (left.output.length != right.output.length) {
+  failValidation(s"Set minus two table of different column sizes:" +
+s" ${left.output.size} and ${right.output.size}")
+}
+val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
+  l.resultType == r.resultType && l.name == r.name }
--- End diff --

I think @wuchong refers exactly to the last case. If number of fields and 
field types are identical, it should be possible to do a `EXCEPT` even if the 
field names are not the same.
The situation for `UNION` is a bit special due to Flink's internals and the 
way the `RowTypeInfo` is implemented but I think we can remove that restriction 
in the future. 

So, we must keep the checks for number of fields and field types but can 
remove the check of the field names, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69045241
  
--- Diff: docs/apis/table.md ---
@@ -873,7 +920,7 @@ val result = tableEnv.sql(
 
  Limitations
 
-The current version of streaming SQL only supports `SELECT`, `FROM`, 
`WHERE`, and `UNION` clauses. Aggregations or joins are not supported yet.
+The current version of streaming SQL only supports `SELECT`, `FROM`, 
`WHERE` and `UNION` clauses. Aggregations or joins are not supported yet.
--- End diff --

We try to consistently use [serial 
commas](https://en.wikipedia.org/wiki/Serial_comma) in this document.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69044780
  
--- Diff: docs/apis/table.md ---
@@ -536,6 +536,29 @@ Table result = left.unionAll(right);
 
 
 
+  Minus
+  
+Similar to a SQL EXCEPT clause. Returns elements from the first 
table that do not exist in the second table. Both tables must have identical 
schema, i.e., field names and types.
+{% highlight java %}
+Table left = tableEnv.fromDataSet(ds1, "a, b, c");
+Table right = tableEnv.fromDataSet(ds2, "a, b, c");
+Table result = left.minus(right);
+{% endhighlight %}
+  
+
+
+
+  MinusAll
+  
+Similar to a SQL EXCEPT ALL clause. Returns elements from the 
first table that do not exist in the second table without removing duplicates. 
Both tables must have identical schema, i.e., field names and types.
--- End diff --

Please rephrase to "Except All returns the records that do not exist in the 
second table. A record that is present n times in the first table and m times 
in the second table is returned (n - m) times, i.e., as many duplicates as are 
present in the second table are removed.".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69044111
  
--- Diff: docs/apis/table.md ---
@@ -536,6 +536,29 @@ Table result = left.unionAll(right);
 
 
 
+  Minus
+  
+Similar to a SQL EXCEPT clause. Returns elements from the first 
table that do not exist in the second table. Both tables must have identical 
schema, i.e., field names and types.
--- End diff --

Please rephrase to "Except returns records from the first table that do not 
exist in the second table. Records that exist multiple times in the first table 
are returned exactly once, i.e., duplicates are removed.".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355944#comment-15355944
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69043528
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -1193,21 +1196,46 @@ public long count() throws Exception {
operation.setInput(this);
return operation.createResult();
}
-   
+
// 

//  Union
// 

 
/**
 * Creates a union of this DataSet with an other DataSet. The other 
DataSet must be of the same data type.
-* 
+*
 * @param other The other DataSet which is unioned with the current 
DataSet.
 * @return The resulting DataSet.
 */
public UnionOperator union(DataSet other){
return new UnionOperator<>(this, other, 
Utils.getCallLocationName());
}
 
+   /**
+   * Creates a set minus of this DataSet with an other DataSet. The other 
DataSet must be of the same data type.
+   *
+   * @param other The other DataSet which is set minus with the current 
DataSet.
+   * @return The resulting DataSet.
+   */
+   public CoGroupOperator minus(DataSet other){
--- End diff --

This issue is about adding `EXCEPT` to the Table. The DataSet API which is 
touched here is a rather low level API and we are quite careful about adding 
new operators. Therefore, changes to the DataSet API should go through a 
separate JIRA issue. Please move this code to `DataSetMinus` class and revert 
the changes to this file. You can open a JIRA issue to discuss adding a `minus` 
operator to the DataSet API. Thank you.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69043528
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -1193,21 +1196,46 @@ public long count() throws Exception {
operation.setInput(this);
return operation.createResult();
}
-   
+
// 

//  Union
// 

 
/**
 * Creates a union of this DataSet with an other DataSet. The other 
DataSet must be of the same data type.
-* 
+*
 * @param other The other DataSet which is unioned with the current 
DataSet.
 * @return The resulting DataSet.
 */
public UnionOperator union(DataSet other){
return new UnionOperator<>(this, other, 
Utils.getCallLocationName());
}
 
+   /**
+   * Creates a set minus of this DataSet with an other DataSet. The other 
DataSet must be of the same data type.
+   *
+   * @param other The other DataSet which is set minus with the current 
DataSet.
+   * @return The resulting DataSet.
+   */
+   public CoGroupOperator minus(DataSet other){
--- End diff --

This issue is about adding `EXCEPT` to the Table. The DataSet API which is 
touched here is a rather low level API and we are quite careful about adding 
new operators. Therefore, changes to the DataSet API should go through a 
separate JIRA issue. Please move this code to `DataSetMinus` class and revert 
the changes to this file. You can open a JIRA issue to discuss adding a `minus` 
operator to the DataSet API. Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4103) Modify CsvTableSource to implement StreamTableSource

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355942#comment-15355942
 ] 

ASF GitHub Bot commented on FLINK-4103:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2162
  
@fhueske Yes, I can shepherd this PR soon.


> Modify CsvTableSource to implement StreamTableSource
> 
>
> Key: FLINK-4103
> URL: https://issues.apache.org/jira/browse/FLINK-4103
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.0.3
>Reporter: Suneel Marthi
>Assignee: Suneel Marthi
>Priority: Minor
> Fix For: 1.1.0
>
>
> CsvTableSource presently extends BatchTableSource. It can be modified to also 
> implement StreamTableSource like CsvTableSink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2162: FLINK-4103: Modify CsvTableSource to implement StreamTabl...

2016-06-29 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2162
  
@fhueske Yes, I can shepherd this PR soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2162: FLINK-4103: Modify CsvTableSource to implement StreamTabl...

2016-06-29 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2162
  
Thanks for the PR @smarthi. 
Looks good to me. I'm traveling right now and do not have my dev machine 
with me. Can somebody else check this PR as well and merge it?
Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4103) Modify CsvTableSource to implement StreamTableSource

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355932#comment-15355932
 ] 

ASF GitHub Bot commented on FLINK-4103:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2162
  
Thanks for the PR @smarthi. 
Looks good to me. I'm traveling right now and do not have my dev machine 
with me. Can somebody else check this PR as well and merge it?
Thanks, Fabian


> Modify CsvTableSource to implement StreamTableSource
> 
>
> Key: FLINK-4103
> URL: https://issues.apache.org/jira/browse/FLINK-4103
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.0.3
>Reporter: Suneel Marthi
>Assignee: Suneel Marthi
>Priority: Minor
> Fix For: 1.1.0
>
>
> CsvTableSource presently extends BatchTableSource. It can be modified to also 
> implement StreamTableSource like CsvTableSink. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3942) Add support for INTERSECT

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355920#comment-15355920
 ] 

ASF GitHub Bot commented on FLINK-3942:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2159
  
Hi @wuchong, thanks for the update! The PR looks mostly good. I left a few 
minor comments.

Thanks, Fabian


> Add support for INTERSECT
> -
>
> Key: FLINK-3942
> URL: https://issues.apache.org/jira/browse/FLINK-3942
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Minor
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a 
> DataSet API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite 
> {{LogicalIntersect}} into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect() 
> method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

2016-06-29 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2159
  
Hi @wuchong, thanks for the update! The PR looks mostly good. I left a few 
minor comments.

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3942) Add support for INTERSECT

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355911#comment-15355911
 ] 

ASF GitHub Bot commented on FLINK-3942:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69039259
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -471,6 +471,32 @@ class Table(
   }
 
   /**
+* Intersect two [[Table]]s. Intersect All returns records that exist 
in both tables, but not
--- End diff --

I would rephrase this to "Intersect All returns records that exist in both 
tables. If a record is present in both tables more than once, it is returned as 
many times as it is present in both tables, i.e., the resulting table might 
have duplicate records."


> Add support for INTERSECT
> -
>
> Key: FLINK-3942
> URL: https://issues.apache.org/jira/browse/FLINK-3942
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Minor
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a 
> DataSet API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite 
> {{LogicalIntersect}} into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect() 
> method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69039259
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -471,6 +471,32 @@ class Table(
   }
 
   /**
+* Intersect two [[Table]]s. Intersect All returns records that exist 
in both tables, but not
--- End diff --

I would rephrase this to "Intersect All returns records that exist in both 
tables. If a record is present in both tables more than once, it is returned as 
many times as it is present in both tables, i.e., the resulting table might 
have duplicate records."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3942) Add support for INTERSECT

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355907#comment-15355907
 ] 

ASF GitHub Bot commented on FLINK-3942:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69039071
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -446,9 +446,9 @@ class Table(
   }
 
   /**
-* Intersect two [[Table]]s with duplicate records removed. Intersect 
returns rows only
-* from the left table that are identical to a row in the right table.
-* Similar to an SQL INTERSECT. The fields of the two union operations 
must fully overlap.
+* Intersect two [[Table]]s with duplicate records removed. Intersect 
returns records that
+* exist in both tables, and emit exactly once. Similar to an SQL 
INTERSECT. The fields of
--- End diff --

I would rephrase this to "Intersect returns records that exist in both 
tables. If a record is present one or both tables more than once, it is 
returned just once, i.e., the resulting table has no duplicate records."


> Add support for INTERSECT
> -
>
> Key: FLINK-3942
> URL: https://issues.apache.org/jira/browse/FLINK-3942
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Minor
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a 
> DataSet API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite 
> {{LogicalIntersect}} into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect() 
> method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69039071
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -446,9 +446,9 @@ class Table(
   }
 
   /**
-* Intersect two [[Table]]s with duplicate records removed. Intersect 
returns rows only
-* from the left table that are identical to a row in the right table.
-* Similar to an SQL INTERSECT. The fields of the two union operations 
must fully overlap.
+* Intersect two [[Table]]s with duplicate records removed. Intersect 
returns records that
+* exist in both tables, and emit exactly once. Similar to an SQL 
INTERSECT. The fields of
--- End diff --

I would rephrase this to "Intersect returns records that exist in both 
tables. If a record is present one or both tables more than once, it is 
returned just once, i.e., the resulting table has no duplicate records."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3942) Add support for INTERSECT

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355889#comment-15355889
 ] 

ASF GitHub Bot commented on FLINK-3942:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69037753
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
 ---
@@ -83,66 +82,68 @@ class DataSetIntersect(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-var leftDataSet: DataSet[Any] = null
-var rightDataSet: DataSet[Any] = null
+val leftDataSet: DataSet[Any] = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet: DataSet[Any] = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-expectedType match {
-  case None =>
-leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-rightDataSet =
-  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
-  case _ =>
-leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-}
-
-val config = tableEnv.getConfig
-
-val returnType = determineReturnType(
-  getRowType,
-  expectedType,
-  config.getNullCheck,
-  config.getEfficientTypeUsage)
-
-val generator = new CodeGenerator(
-  config,
-  false,
-  leftDataSet.getType,
-  Some(rightDataSet.getType))
-
-val conversion = generator.generateConverterResultExpression(
-  returnType,
-  left.getRowType.getFieldNames)
+val coGroupedDs = leftDataSet.coGroup(rightDataSet)
 
+val leftType = leftDataSet.getType
+val rightType = rightDataSet.getType
 
-val body = s"""
-|${conversion.code}
-|${generator.collectorTerm}.collect(${conversion.resultTerm});
-|""".stripMargin
+// If it is atomic type, the field expression need to be "*".
+// Otherwise, we use int-based field position keys
+val coGroupedPredicateDs =
+  if (leftType.isTupleType || 
leftType.isInstanceOf[CompositeType[Any]]) {
+  coGroupedDs.where(0 until left.getRowType.getFieldCount: _*)
+} else {
+  coGroupedDs.where("*")
+}
 
-val genFunction = generator.generateFunction(
-  ruleDescription,
-  classOf[FlatJoinFunction[Any, Any, Any]],
-  body,
-  returnType)
+val coGroupedWithoutFunctionDs =
--- End diff --

Same here.


> Add support for INTERSECT
> -
>
> Key: FLINK-3942
> URL: https://issues.apache.org/jira/browse/FLINK-3942
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Minor
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a 
> DataSet API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite 
> {{LogicalIntersect}} into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect() 
> method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3942) Add support for INTERSECT

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355890#comment-15355890
 ] 

ASF GitHub Bot commented on FLINK-3942:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69037698
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
 ---
@@ -83,66 +82,68 @@ class DataSetIntersect(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-var leftDataSet: DataSet[Any] = null
-var rightDataSet: DataSet[Any] = null
+val leftDataSet: DataSet[Any] = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet: DataSet[Any] = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-expectedType match {
-  case None =>
-leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-rightDataSet =
-  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
-  case _ =>
-leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-}
-
-val config = tableEnv.getConfig
-
-val returnType = determineReturnType(
-  getRowType,
-  expectedType,
-  config.getNullCheck,
-  config.getEfficientTypeUsage)
-
-val generator = new CodeGenerator(
-  config,
-  false,
-  leftDataSet.getType,
-  Some(rightDataSet.getType))
-
-val conversion = generator.generateConverterResultExpression(
-  returnType,
-  left.getRowType.getFieldNames)
+val coGroupedDs = leftDataSet.coGroup(rightDataSet)
 
+val leftType = leftDataSet.getType
+val rightType = rightDataSet.getType
 
-val body = s"""
-|${conversion.code}
-|${generator.collectorTerm}.collect(${conversion.resultTerm});
-|""".stripMargin
+// If it is atomic type, the field expression need to be "*".
--- End diff --

I think you can use `*` for all cases. Composite types should be 
(recursively) expanded to all atomic member types.


> Add support for INTERSECT
> -
>
> Key: FLINK-3942
> URL: https://issues.apache.org/jira/browse/FLINK-3942
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Minor
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a 
> DataSet API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite 
> {{LogicalIntersect}} into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect() 
> method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69037753
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
 ---
@@ -83,66 +82,68 @@ class DataSetIntersect(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-var leftDataSet: DataSet[Any] = null
-var rightDataSet: DataSet[Any] = null
+val leftDataSet: DataSet[Any] = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet: DataSet[Any] = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-expectedType match {
-  case None =>
-leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-rightDataSet =
-  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
-  case _ =>
-leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-}
-
-val config = tableEnv.getConfig
-
-val returnType = determineReturnType(
-  getRowType,
-  expectedType,
-  config.getNullCheck,
-  config.getEfficientTypeUsage)
-
-val generator = new CodeGenerator(
-  config,
-  false,
-  leftDataSet.getType,
-  Some(rightDataSet.getType))
-
-val conversion = generator.generateConverterResultExpression(
-  returnType,
-  left.getRowType.getFieldNames)
+val coGroupedDs = leftDataSet.coGroup(rightDataSet)
 
+val leftType = leftDataSet.getType
+val rightType = rightDataSet.getType
 
-val body = s"""
-|${conversion.code}
-|${generator.collectorTerm}.collect(${conversion.resultTerm});
-|""".stripMargin
+// If it is atomic type, the field expression need to be "*".
+// Otherwise, we use int-based field position keys
+val coGroupedPredicateDs =
+  if (leftType.isTupleType || 
leftType.isInstanceOf[CompositeType[Any]]) {
+  coGroupedDs.where(0 until left.getRowType.getFieldCount: _*)
+} else {
+  coGroupedDs.where("*")
+}
 
-val genFunction = generator.generateFunction(
-  ruleDescription,
-  classOf[FlatJoinFunction[Any, Any, Any]],
-  body,
-  returnType)
+val coGroupedWithoutFunctionDs =
--- End diff --

Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69037698
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
 ---
@@ -83,66 +82,68 @@ class DataSetIntersect(
   tableEnv: BatchTableEnvironment,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-var leftDataSet: DataSet[Any] = null
-var rightDataSet: DataSet[Any] = null
+val leftDataSet: DataSet[Any] = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet: DataSet[Any] = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-expectedType match {
-  case None =>
-leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-rightDataSet =
-  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
-  case _ =>
-leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-}
-
-val config = tableEnv.getConfig
-
-val returnType = determineReturnType(
-  getRowType,
-  expectedType,
-  config.getNullCheck,
-  config.getEfficientTypeUsage)
-
-val generator = new CodeGenerator(
-  config,
-  false,
-  leftDataSet.getType,
-  Some(rightDataSet.getType))
-
-val conversion = generator.generateConverterResultExpression(
-  returnType,
-  left.getRowType.getFieldNames)
+val coGroupedDs = leftDataSet.coGroup(rightDataSet)
 
+val leftType = leftDataSet.getType
+val rightType = rightDataSet.getType
 
-val body = s"""
-|${conversion.code}
-|${generator.collectorTerm}.collect(${conversion.resultTerm});
-|""".stripMargin
+// If it is atomic type, the field expression need to be "*".
--- End diff --

I think you can use `*` for all cases. Composite types should be 
(recursively) expanded to all atomic member types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69036989
  
--- Diff: docs/apis/table.md ---
@@ -535,6 +535,30 @@ Table result = left.unionAll(right);
   
 
 
+   
+  Intersect
+  
+Similar to a SQL INTERSECT clause. Intersects two tables with 
duplicate records removed. Both tables must have identical field types.
+{% highlight scala %}
--- End diff --

This should be Java code + `highlight java`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3942) Add support for INTERSECT

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355875#comment-15355875
 ] 

ASF GitHub Bot commented on FLINK-3942:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69036989
  
--- Diff: docs/apis/table.md ---
@@ -535,6 +535,30 @@ Table result = left.unionAll(right);
   
 
 
+   
+  Intersect
+  
+Similar to a SQL INTERSECT clause. Intersects two tables with 
duplicate records removed. Both tables must have identical field types.
+{% highlight scala %}
--- End diff --

This should be Java code + `highlight java`


> Add support for INTERSECT
> -
>
> Key: FLINK-3942
> URL: https://issues.apache.org/jira/browse/FLINK-3942
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Minor
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a 
> DataSet API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite 
> {{LogicalIntersect}} into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect() 
> method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3942) Add support for INTERSECT

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355876#comment-15355876
 ] 

ASF GitHub Bot commented on FLINK-3942:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69037003
  
--- Diff: docs/apis/table.md ---
@@ -535,6 +535,30 @@ Table result = left.unionAll(right);
   
 
 
+   
+  Intersect
+  
+Similar to a SQL INTERSECT clause. Intersects two tables with 
duplicate records removed. Both tables must have identical field types.
+{% highlight scala %}
+val left = ds1.toTable(tableEnv, "a, b, c");
+val right = ds2.toTable(tableEnv, "d, e, f");
+val result = left.intersect(right);
+{% endhighlight %}
+  
+
+
+   
+  IntersectAll
+  
+Similar to a SQL INTERSECT ALL clause. Intersects two tables. 
It returns as many identical records as are in both tables. Both tables must 
have identical field types.
+{% highlight scala %}
+val left = ds1.toTable(tableEnv, "a, b, c");
--- End diff --

This should be Java code + `highlight java`


> Add support for INTERSECT
> -
>
> Key: FLINK-3942
> URL: https://issues.apache.org/jira/browse/FLINK-3942
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Minor
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a 
> DataSet API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite 
> {{LogicalIntersect}} into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect() 
> method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

2016-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2159#discussion_r69037003
  
--- Diff: docs/apis/table.md ---
@@ -535,6 +535,30 @@ Table result = left.unionAll(right);
   
 
 
+   
+  Intersect
+  
+Similar to a SQL INTERSECT clause. Intersects two tables with 
duplicate records removed. Both tables must have identical field types.
+{% highlight scala %}
+val left = ds1.toTable(tableEnv, "a, b, c");
+val right = ds2.toTable(tableEnv, "d, e, f");
+val result = left.intersect(right);
+{% endhighlight %}
+  
+
+
+   
+  IntersectAll
+  
+Similar to a SQL INTERSECT ALL clause. Intersects two tables. 
It returns as many identical records as are in both tables. Both tables must 
have identical field types.
+{% highlight scala %}
+val left = ds1.toTable(tableEnv, "a, b, c");
--- End diff --

This should be Java code + `highlight java`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-3873) Add a Kafka TableSink with Avro serialization

2016-06-29 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-3873:
-

Assignee: Ivan Mushketyk

> Add a Kafka TableSink with Avro serialization
> -
>
> Key: FLINK-3873
> URL: https://issues.apache.org/jira/browse/FLINK-3873
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Add a TableSink that writes Avro serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3873) Add a Kafka TableSink with Avro serialization

2016-06-29 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355835#comment-15355835
 ] 

Ivan Mushketyk commented on FLINK-3873:
---

Thank you for your reply. I'll work on it.

> Add a Kafka TableSink with Avro serialization
> -
>
> Key: FLINK-3873
> URL: https://issues.apache.org/jira/browse/FLINK-3873
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Minor
>
> Add a TableSink that writes Avro serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4029) Multi-field "sum" function just like "keyBy"

2016-06-29 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355830#comment-15355830
 ] 

Ivan Mushketyk commented on FLINK-4029:
---

Could you please provide an example of how this would look like, just to make 
sure that we are on the same page?

> Multi-field "sum" function just like "keyBy"
> 
>
> Key: FLINK-4029
> URL: https://issues.apache.org/jira/browse/FLINK-4029
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Rami
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> I can use keyBy as follows:
> stream.keyBy(“pojo.field1”,”pojo.field2”,…)
> Would make sense that I can use sum for example, to do its job for more than 
> one field:
> stream.sum(“pojo.field1”,”pojo.field2”,…)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-29 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69023616
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
 ---
@@ -87,13 +87,15 @@ public void testRedisSentinelOperation() {
 
@After
public void tearDown() throws IOException {
-   if (jedisSentinelPool != null)
+   if (jedisSentinelPool != null){
--- End diff --

missing blank between `){`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2169
  
Updated documentation as suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-29 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69022373
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ */
+public enum RedisCommand {
+
+   /**
+* Insert the specified value at the head of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operations.
+*/
+   LPUSH(RedisDataType.LIST),
+   /**
+* Insert the specified value at the tail of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*/
+   RPUSH(RedisDataType.LIST),
+
+   /**
+* Add the specified member to the set stored at key.
+* Specified member that is already a member of this set is ignored.
+*/
+   SADD(RedisDataType.SET),
+
+   /**
+* Set key to hold the string value. If key already holds a value,
+* it is overwritten, regardless of its type.
+*/
+   SET(RedisDataType.STRING),
+
+   /**
+* Adds the element to the HyperLogLog data structure stored at the 
variable name specified as first argument.
+*/
+   PFADD(RedisDataType.HYPER_LOG_LOG),
+
+   /**
+* Posts a message to the given channel.
+*/
+   PUBLISH(RedisDataType.PUBSUB),
+
+   /**
+* Adds the specified members with the specified score to the sorted 
set stored at key.
+*/
+   ZADD(RedisDataType.SORTED_SET),
+
+   /**
+* Sets field in the hash stored at key to value. If key does not exist,
+* a new key holding a hash is created. If field already exists in the 
hash, it is overwritten.
+*/
+   HSET(RedisDataType.HASH);
+
+   /**
+* The {@link RedisDataType} this command belongs to
+*/
+   private RedisDataType redisDataType;
+
+   RedisCommand(RedisDataType redisDataType) {
+   this.redisDataType = redisDataType;
--- End diff --

`null` check missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector

2016-06-29 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69022016
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ */
+public enum RedisCommand {
+
+   /**
+* Insert the specified value at the head of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operations.
+*/
+   LPUSH(RedisDataType.LIST),
+   /**
--- End diff --

nit: missing empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-06-29 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r69021580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
+  *
+  */
+class DataSetMinus(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+rowType: RelDataType,
+all: Boolean)
+  extends BiRel(cluster, traitSet, left, right)
+with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetMinus(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  all
+)
+  }
+
+  override def toString: String = {
+s"SetMinus(setMinus: 
(${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+
+val children = this.getInputs
+val rowCnt = children.foldLeft(0D) { (rows, child) =>
+  rows + metadata.getRowCount(child)
+}
+
+planner.getCostFactory.makeCost(rowCnt, 0, 0)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+var leftDataSet: DataSet[Any] = null
+var rightDataSet: DataSet[Any] = null
+
+expectedType match {
+  case None =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+rightDataSet =
+  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
+  case _ =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+}
+
+val minusRes = leftDataSet.minus(rightDataSet)
+if (!all) {
+  minusRes.distinct()
--- End diff --

On the second hand "union" method in DataSet.java preserves duplicates and 
used to implement UNION ALL. UNION is implemented by using "distinct" operation 
on top of UNION ALL.
It seems that what you suggest will only add code duplication since it will 
basically implement a specialized version of "distinct" operator specifically 
for the "minus" method.

I have no strong preferences though.  @fhueske what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

2016-06-29 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2053#discussion_r69015248
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java
 ---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+
+/**
+ * This is an implementation of the Binary Affinity Propagation algorithm 
using a scatter-gather iteration.
+ * Note that is not the original Affinity Propagation.
+ *
+ * The input is an undirected graph where the vertices are the points to 
be clustered and the edge weights are the
--- End diff --

The matrix can be asymmetric as in Frey and Dueck's example of city flight 
times. Their paper also discusses sparse graphs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3618) Rename abstract UDF classes in Scatter-Gather implementation

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355723#comment-15355723
 ] 

ASF GitHub Bot commented on FLINK-3618:
---

Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2184
  
Thanks a lot @greghogan. Everything looks good. Can you please update the 
docs too?


> Rename abstract UDF classes in Scatter-Gather implementation
> 
>
> Key: FLINK-3618
> URL: https://issues.apache.org/jira/browse/FLINK-3618
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Martin Junghanns
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> We now offer three Vertex-centric computing abstractions:
> * Pregel
> * Gather-Sum-Apply
> * Scatter-Gather
> Each of these abstractions provides abstract classes that need to be 
> implemented by the user:
> * Pregel: {{ComputeFunction}}
> * GSA: {{GatherFunction}}, {{SumFunction}}, {{ApplyFunction}}
> * Scatter-Gather: {{MessagingFunction}}, {{VertexUpdateFunction}}
> In Pregel and GSA, the names of those functions follow the name of the 
> abstraction or the name suggested in the corresponding papers. For 
> consistency of the API, I propose to rename {{MessageFunction}} to 
> {{ScatterFunction}} and {{VertexUpdateFunction}} to {{GatherFunction}}.
> Also for consistency, I would like to change the parameter order in 
> {{Graph.runScatterGatherIteration(VertexUpdateFunction f1, MessagingFunction 
> f2}} to  {{Graph.runScatterGatherIteration(ScatterFunction f1, GatherFunction 
> f2}} (like in {{Graph.runGatherSumApplyFunction(...)}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2184: [FLINK-3618] [gelly] Rename abstract UDF classes in Scatt...

2016-06-29 Thread vasia
Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2184
  
Thanks a lot @greghogan. Everything looks good. Can you please update the 
docs too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1759) Execution statistics for vertex-centric iterations

2016-06-29 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355714#comment-15355714
 ] 

Greg Hogan commented on FLINK-1759:
---

Are we still looking to add to the web frontend per-iteration statistics?

> Execution statistics for vertex-centric iterations
> --
>
> Key: FLINK-1759
> URL: https://issues.apache.org/jira/browse/FLINK-1759
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 0.9
>Reporter: Vasia Kalavri
>Priority: Minor
>
> It would be nice to add an option for gathering execution statistics from 
> VertexCentricIteration.
> In particular, the following metrics could be useful:
> - total number of supersteps
> - number of messages sent (total / per superstep)
> - bytes of messages exchanged (total / per superstep)
> - execution time (total / per superstep)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2016-06-29 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355673#comment-15355673
 ] 

Vasia Kalavri commented on FLINK-1707:
--

Thanks [~joseprupi]. If you could document the example, it should make it clear 
whether we can find a scalable implementation or not.
For the original AP, it should be easy to port the Giraph implementation 
(described in the link of this JIRA description) to Gelly.

> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep Rubió
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3879) Native implementation of HITS algorithm

2016-06-29 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355664#comment-15355664
 ] 

Vasia Kalavri commented on FLINK-3879:
--

You and I are the only Gelly component "shepherds", but that doesn't mean we 
are the only ones that can review Gelly PRs. Any committer can help :)
I like the idea about improving our process. I went through the JIRAs last week 
and pinged a few people, released some, closed some, but we can certainly clean 
up more.
Big +1 for roadmap => JIRA => implementation discussion => PR.
Do you think we should add this to the wiki /contribution guidelines / start a 
discussion in the dev list?

> Native implementation of HITS algorithm
> ---
>
> Key: FLINK-3879
> URL: https://issues.apache.org/jira/browse/FLINK-3879
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is 
> presented in [0] and described in [1].
> "[HITS] is a very popular and effective algorithm to rank documents based on 
> the link information among a set of documents. The algorithm presumes that a 
> good hub is a document that points to many others, and a good authority is a 
> document that many documents point to." 
> [https://pdfs.semanticscholar.org/a8d7/c7a4c53a9102c4239356f9072ec62ca5e62f.pdf]
> This implementation differs from FLINK-2044 by providing for convergence, 
> outputting both hub and authority scores, and completing in half the number 
> of iterations.
> [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf
> [1] https://en.wikipedia.org/wiki/HITS_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3675) YARN ship folder incosistent behavior

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355580#comment-15355580
 ] 

ASF GitHub Bot commented on FLINK-3675:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2187

[FLINK-3675][yarn] improvements to library shipping

- always ship the lib folder
- properly setup the classpath from the supplied ship files
- cleanup deploy() method of YarnClusterDescriptor
- add test case

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3675

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2187.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2187


commit 1dbaa5a91ae8ae3f50ed26d4a51ec57bb28bc77e
Author: Maximilian Michels 
Date:   2016-06-27T13:13:56Z

[FLINK-3675][yarn] improvements to library shipping

- always ship the lib folder
- properly setup the classpath from the supplied ship files
- cleanup deploy() method of YarnClusterDescriptor
- add test case




> YARN ship folder incosistent behavior
> -
>
> Key: FLINK-3675
> URL: https://issues.apache.org/jira/browse/FLINK-3675
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.0.0
>Reporter: Stefano Baghino
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 1.1.0
>
>
> After [some discussion on the user mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-YARN-ship-folder-td5458.html]
>  it came up that the {{flink/lib}} folder is always supposed to be shipped to 
> the YARN cluster so that all the nodes have access to its contents.
> Currently however, the Flink long-running YARN session actually ships the 
> folder because it's explicitly specified in the {{yarn-session.sh}} script, 
> while running a single job on YARN does not automatically ship it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2187: [FLINK-3675][yarn] improvements to library shippin...

2016-06-29 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2187

[FLINK-3675][yarn] improvements to library shipping

- always ship the lib folder
- properly setup the classpath from the supplied ship files
- cleanup deploy() method of YarnClusterDescriptor
- add test case

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3675

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2187.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2187


commit 1dbaa5a91ae8ae3f50ed26d4a51ec57bb28bc77e
Author: Maximilian Michels 
Date:   2016-06-27T13:13:56Z

[FLINK-3675][yarn] improvements to library shipping

- always ship the lib folder
- properly setup the classpath from the supplied ship files
- cleanup deploy() method of YarnClusterDescriptor
- add test case




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1337) Create an Amazon EMR Bootstrap Action

2016-06-29 Thread Timur Fayruzov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355546#comment-15355546
 ] 

Timur Fayruzov commented on FLINK-1337:
---

I have created a gist for the Flink setup that I used: 
https://gist.github.com/TimurFayruzov/c2ec9ddcd49eaef71f89f78ba650a72b. It has 
flink version and most settings hard-coded unfortunately, but that's relatively 
straightforward to change

Prerequisites:
1. You need to get all files in a directory and replace placeholders for bucket 
name, aws key and your application name. 
2. Upload flink tgz to s3:///flink/.
2. Upload your application jar to s3:///flink/.

$> ./start_cluster.sh
This will start an EMR cluster, start Flink runtime and schedule a step for 
your application, so that even when it completes you will still have access to 
the dashboard.

> Create an Amazon EMR Bootstrap Action
> -
>
> Key: FLINK-1337
> URL: https://issues.apache.org/jira/browse/FLINK-1337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stephan Ewen
>Assignee: Timur Fayruzov
>Priority: Minor
>
> EMR offers bootstrap actions that prepare the cluster by installing 
> additional components, etc..
> We can offer a Flink bootstrap action that downloads, unpacks, and configures 
> Flink. It may optionally install libraries that we like to use (such as 
> Python, BLAS/JBLAS, ...)
> http://blogs.aws.amazon.com/bigdata/post/TxO6EHTHQALSIB/Getting-Started-with-Amazon-EMR-Bootstrap-Actions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3397) Failed streaming jobs should fall back to the most recent checkpoint/savepoint

2016-06-29 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355534#comment-15355534
 ] 

ramkrishna.s.vasudevan commented on FLINK-3397:
---

[~uce]
Any feedback here. Is this going to be a simple logical change in the 
CheckPointcoordinator#restoreLatestCheckpointedState such that we check the 
checkPointID from the save point and the checkPointID from the checkpoint 
coordinator see which one is latest and then go ahead with the latest as the 
restoration point?  Or are you seeing some greater design change wrt savapoints 
and checkpoints are handled?

> Failed streaming jobs should fall back to the most recent checkpoint/savepoint
> --
>
> Key: FLINK-3397
> URL: https://issues.apache.org/jira/browse/FLINK-3397
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Gyula Fora
>Priority: Minor
>
> The current fallback behaviour in case of a streaming job failure is slightly 
> counterintuitive:
> If a job fails it will fall back to the most recent checkpoint (if any) even 
> if there were more recent savepoint taken. This means that savepoints are not 
> regarded as checkpoints by the system only points from where a job can be 
> manually restarted.
> I suggest to change this so that savepoints are also regarded as checkpoints 
> in case of a failure and they will also be used to automatically restore the 
> streaming job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4116) Document metrics

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355496#comment-15355496
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68987851
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 

[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68987851
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter()` call. The order is always 
\\\.
+
+The system scope allows the reported name 

[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355493#comment-15355493
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2146
  
That is a known and intended behavior. It is the users responsibility to 
make sure that no naming conflicts arise, either by providing unique 
job/task/operator names or modifying the scope formats.

Regarding your suggestion, even if we exclude the disparity between the 
name a user expects and the name we might provide, there is still the issue 
that it (sadly) doesn't actually solve anything.

Assume 2 TaskManagers, both exporting a metric `MyJob.metric` and 
`MyJob1.metric`.

You cannot guarantee that both versions of `MyJob.metric` actually belong 
to the same job.
You can't even make the guarantee that all values a TM exposes as 
`MyJob.metric` belong to a single job.

This problem also exists right; the only difference is that you only have 1 
set of inconsistent data, and not 2.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2146
  
That is a known and intended behavior. It is the users responsibility to 
make sure that no naming conflicts arise, either by providing unique 
job/task/operator names or modifying the scope formats.

Regarding your suggestion, even if we exclude the disparity between the 
name a user expects and the name we might provide, there is still the issue 
that it (sadly) doesn't actually solve anything.

Assume 2 TaskManagers, both exporting a metric `MyJob.metric` and 
`MyJob1.metric`.

You cannot guarantee that both versions of `MyJob.metric` actually belong 
to the same job.
You can't even make the guarantee that all values a TM exposes as 
`MyJob.metric` belong to a single job.

This problem also exists right; the only difference is that you only have 1 
set of inconsistent data, and not 2.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355475#comment-15355475
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68984285
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void 

[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68984285
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+

[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355473#comment-15355473
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68983836
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void 

[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68983836
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+

[jira] [Commented] (FLINK-4116) Document metrics

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355470#comment-15355470
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2158
  
The documentation has reached a really good state. I had only some minor 
comments left.


> Document metrics
> 
>
> Key: FLINK-4116
> URL: https://issues.apache.org/jira/browse/FLINK-4116
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>
> The metric system is currently not documented, which should be fixed before 
> the 1.1 release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4116) Document metrics

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355468#comment-15355468
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68981861
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 

[jira] [Commented] (FLINK-4116) Document metrics

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355469#comment-15355469
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68981623
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 

[jira] [Commented] (FLINK-4116) Document metrics

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355467#comment-15355467
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68981838
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 

[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355464#comment-15355464
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68973636
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public 

[GitHub] flink issue #2158: [FLINK-4116] Metrics documentation

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2158
  
The documentation has reached a really good state. I had only some minor 
comments left.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2146
  
I've tested the job manager metrics and everything seems to work :-) 

I noticed, however, that it is not possible to submit two jobs with the 
same job name and obtaining their metrics. An `InstanceAlreadyExistsException` 
is thrown and the latter job is simply not reported. Would it be possible to 
append a running counter to the job name if we encounter such an exception? 
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68981623
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter()` call. The order is always 
\\\.
+
+The system scope allows the 

[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68981861
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter()` call. The order is always 
\\\.
+
+The system scope allows the 

[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68981838
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,397 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  ...
+
+  @Override
+  public void open(Configuration config) {
+// register a custom counter
+this.counter = 
getRuntimeContext().getmetricGroup().counter("myCustomCounter", new 
CustomCounter());
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+// register the gauge
+getRuntimeContext().getmetricGroup().gauge("MyGauge", new 
Gauge() {
+  @Override
+  public Integer getValue() {
+return valueToExpose;
+  }});
+...
+  }
+  ...
+}
+
+{% endhighlight %}
+
+ Histogram
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.histogram = 
getRuntimeContext().getMetricGroup().histogram("myHistogram", new 
MyHistogram());
+...
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+...
+  }
+}
+{% endhighlight %}
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+This wrapper is contained in the `flink-metrics-dropwizard` module.
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter()` call. The order is always 
\\\.
+
+The system scope allows the 

[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68974303
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }

[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68973636
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 ---
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.HeapStateStore;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Int;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+
+   private static ActorSystem system;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }

[GitHub] flink pull request #2182: [Flink-4130] CallGenerator could generate illegal ...

2016-06-29 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2182#discussion_r68972966
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
 ---
@@ -43,11 +43,16 @@ object CallGenerator {
 val nullTerm = newName("isNull")
 val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType)
 val defaultValue = primitiveDefaultValue(returnType)
+val nullCheckTerms = if(operands.size > 0) {
+  operands.map(_.nullTerm).mkString(" || ")
+} else {
+  nullCheck + ""
+}
 
 val resultCode = if (nullCheck) {
--- End diff --

oh yes, it was a mis-thought, thanks~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >