[jira] [Comment Edited] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2016-11-04 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582930#comment-15582930
 ] 

Ted Yu edited comment on FLINK-4848 at 11/5/16 4:41 AM:


There is similar issue with trustStoreFilePath:
{code}
trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
{code}


was (Author: yuzhih...@gmail.com):
There is similar issue with trustStoreFilePath.

> keystoreFilePath should be checked against null in 
> SSLUtils#createSSLServerContext
> --
>
> Key: FLINK-4848
> URL: https://issues.apache.org/jira/browse/FLINK-4848
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4938) Unnecessary update() call for Views to be removed in ViewUpdater#run()

2016-11-04 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-4938.
---
Resolution: Not A Problem

> Unnecessary update() call for Views to be removed in ViewUpdater#run()
> --
>
> Key: FLINK-4938
> URL: https://issues.apache.org/jira/browse/FLINK-4938
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   for (View toUpdate : this.views) {
> toUpdate.update();
>   }
>   synchronized (lock) {
> views.addAll(toAdd);
> toAdd.clear();
> views.removeAll(toRemove);
> {code}
> For View's in toRemove, update() call is not needed. See earlier javadoc:
> {code}
>* Notifies this ViewUpdater of a metric that should no longer be regularly 
> updated.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5017) Introduce WatermarkStatus stream element to allow for temporarily idle streaming sources

2016-11-04 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15638540#comment-15638540
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5017:


If we want to extend {{Watermark}}, to aid with readability, I think it's also 
perfectly fine to have a separate {{WatermarkStatus}} enumeration that 
operators can use.

> Introduce WatermarkStatus stream element to allow for temporarily idle 
> streaming sources
> 
>
> Key: FLINK-5017
> URL: https://issues.apache.org/jira/browse/FLINK-5017
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0
>
>
> A {{WatermarkStatus}} element informs receiving operators whether or not they 
> should continue to expect watermarks from the sending operator. There are 2 
> kinds of status, namely {{IDLE}} and {{ACTIVE}}. Watermark status elements 
> are generated at the sources, and may be propagated through the operators of 
> the topology using {{Output#emitWatermarkStatus(WatermarkStatus)}}.
> Sources and downstream operators should emit either of the status elements 
> once it changes between "watermark-idle" and "watermark-active" states.
> A source is considered "watermark-idle" if it will not emit records for an 
> indefinite amount of time. This is the case, for example, for Flink's Kafka 
> Consumer, where sources might initially have no assigned partitions to read 
> from, or no records can be read from the assigned partitions. Once the source 
> detects that it will resume emitting data, it is considered 
> "watermark-active".
> Downstream operators with multiple inputs (ex. head operators of a 
> {{OneInputStreamTask}} or {{TwoInputStreamTask}}) should not wait for 
> watermarks from an upstream operator that is "watermark-idle" when deciding 
> whether or not to advance the operator's current watermark. When a downstream 
> operator determines that all upstream operators are "watermark-idle" (i.e. 
> when all input channels have received the watermark idle status element), 
> then the operator is considered to also be "watermark-idle", as it will 
> temporarily be unable to advance its own watermark. This is always the case 
> for operators that only read from a single upstream operator. Once an 
> operator is considered "watermark-idle", it should itself forward its idle 
> status to inform downstream operators. The operator is considered to be back 
> to "watermark-active" as soon as at least one of its upstream operators 
> resume to be "watermark-active" (i.e. when at least one input channel 
> receives the watermark active status element), and should also forward its 
> active status to inform downstream operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5017) Introduce WatermarkStatus stream element to allow for temporarily idle streaming sources

2016-11-04 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15638540#comment-15638540
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5017 at 11/5/16 4:09 AM:
-

If we want to extend {{Watermark}}, to aid with readability a bit, I think it's 
also fine to have a separate {{WatermarkStatus}} enumeration that operators can 
use.


was (Author: tzulitai):
If we want to extend {{Watermark}}, to aid with readability, I think it's also 
perfectly fine to have a separate {{WatermarkStatus}} enumeration that 
operators can use.

> Introduce WatermarkStatus stream element to allow for temporarily idle 
> streaming sources
> 
>
> Key: FLINK-5017
> URL: https://issues.apache.org/jira/browse/FLINK-5017
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0
>
>
> A {{WatermarkStatus}} element informs receiving operators whether or not they 
> should continue to expect watermarks from the sending operator. There are 2 
> kinds of status, namely {{IDLE}} and {{ACTIVE}}. Watermark status elements 
> are generated at the sources, and may be propagated through the operators of 
> the topology using {{Output#emitWatermarkStatus(WatermarkStatus)}}.
> Sources and downstream operators should emit either of the status elements 
> once it changes between "watermark-idle" and "watermark-active" states.
> A source is considered "watermark-idle" if it will not emit records for an 
> indefinite amount of time. This is the case, for example, for Flink's Kafka 
> Consumer, where sources might initially have no assigned partitions to read 
> from, or no records can be read from the assigned partitions. Once the source 
> detects that it will resume emitting data, it is considered 
> "watermark-active".
> Downstream operators with multiple inputs (ex. head operators of a 
> {{OneInputStreamTask}} or {{TwoInputStreamTask}}) should not wait for 
> watermarks from an upstream operator that is "watermark-idle" when deciding 
> whether or not to advance the operator's current watermark. When a downstream 
> operator determines that all upstream operators are "watermark-idle" (i.e. 
> when all input channels have received the watermark idle status element), 
> then the operator is considered to also be "watermark-idle", as it will 
> temporarily be unable to advance its own watermark. This is always the case 
> for operators that only read from a single upstream operator. Once an 
> operator is considered "watermark-idle", it should itself forward its idle 
> status to inform downstream operators. The operator is considered to be back 
> to "watermark-active" as soon as at least one of its upstream operators 
> resume to be "watermark-active" (i.e. when at least one input channel 
> receives the watermark active status element), and should also forward its 
> active status to inform downstream operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15638443#comment-15638443
 ] 

ASF GitHub Bot commented on FLINK-4282:
---

Github user soniclavier commented on a diff in the pull request:

https://github.com/apache/flink/pull/2355#discussion_r86656836
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
 ---
@@ -236,6 +236,6 @@ public int compare(TimeWindow o1, TimeWindow o2) {
 * @return window start
 */
public static long getWindowStartWithOffset(long timestamp, long 
offset, long windowSize) {
--- End diff --

Is the windowSize actually slide size? I see that it is called by passing 
the slide instead of size `TimeWindow.getWindowStartWithOffset(timestamp, 
offset, slide);`


> Add Offset Parameter to WindowAssigners
> ---
>
> Key: FLINK-4282
> URL: https://issues.apache.org/jira/browse/FLINK-4282
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> Currently, windows are always aligned to EPOCH, which basically means days 
> are aligned with GMT. This is somewhat problematic for people living in 
> different timezones.
> And offset parameter would allow to adapt the window assigner to the timezone.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2355: [FLINK-4282]Add Offset Parameter to WindowAssigner...

2016-11-04 Thread soniclavier
Github user soniclavier commented on a diff in the pull request:

https://github.com/apache/flink/pull/2355#discussion_r86656836
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
 ---
@@ -236,6 +236,6 @@ public int compare(TimeWindow o1, TimeWindow o2) {
 * @return window start
 */
public static long getWindowStartWithOffset(long timestamp, long 
offset, long windowSize) {
--- End diff --

Is the windowSize actually slide size? I see that it is called by passing 
the slide instead of size `TimeWindow.getWindowStartWithOffset(timestamp, 
offset, slide);`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15638048#comment-15638048
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86649434
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
--- End diff --

"At here, function.getSqlFunction (name, typeFactory)" the "name" is user 
input,but at ScalarFunctionCall I see "scalarFunction.getSqlFunction 
(scalarFunction.toString, typeFactory)", which may not be we expect Right? 


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86649434
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
--- End diff --

"At here, function.getSqlFunction (name, typeFactory)" the "name" is user 
input,but at ScalarFunctionCall I see "scalarFunction.getSqlFunction 
(scalarFunction.toString, typeFactory)", which may not be we expect Right? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2724: [FLINK-4221] Show metrics in WebFrontend + general...

2016-11-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2724


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4221) Show metrics in WebFrontend

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637665#comment-15637665
 ] 

ASF GitHub Bot commented on FLINK-4221:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2724


> Show metrics in WebFrontend
> ---
>
> Key: FLINK-4221
> URL: https://issues.apache.org/jira/browse/FLINK-4221
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Assignee: Robert Metzger
> Fix For: pre-apache, 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4221) Show metrics in WebFrontend

2016-11-04 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-4221.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/3a4fc537

> Show metrics in WebFrontend
> ---
>
> Key: FLINK-4221
> URL: https://issues.apache.org/jira/browse/FLINK-4221
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Assignee: Robert Metzger
> Fix For: 1.2.0, pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4221) Show metrics in WebFrontend

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637449#comment-15637449
 ] 

ASF GitHub Bot commented on FLINK-4221:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2724
  
I'll merge the change ...


> Show metrics in WebFrontend
> ---
>
> Key: FLINK-4221
> URL: https://issues.apache.org/jira/browse/FLINK-4221
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2724: [FLINK-4221] Show metrics in WebFrontend + general improv...

2016-11-04 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2724
  
I'll merge the change ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4998) ResourceManager fails when num task slots > Yarn vcores

2016-11-04 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-4998.
-
   Resolution: Fixed
Fix Version/s: 1.1.4

master: e4807621b8f41fc4f9fa69f423f1fbf7bba05218
release-1.1: fe2c4ba6a8dbbe5bde75c1dd816ae5d2004910e0

> ResourceManager fails when num task slots > Yarn vcores
> ---
>
> Key: FLINK-4998
> URL: https://issues.apache.org/jira/browse/FLINK-4998
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN Client
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> The ResourceManager fails to acquire containers when the users configures the 
> number of task slots to be greater than the maximum number of virtual cores 
> of the Yarn cluster.
> We should check during deployment that the task slots are not configured to 
> be larger than the virtual cores.
> {noformat}
> 2016-11-02 14:39:01,948 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - FATAL ERROR IN YARN APPLICATION MASTER: Connection to YARN 
> Resource Manager failed
> org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid 
> resource request, requested virtual cores < 0, or requested virtual cores > 
> max configured, requestedVirtualCores=3, maxVirtualCores=1
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3813) YARNSessionFIFOITCase.testDetachedMode failed on Travis

2016-11-04 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-3813.
---
   Resolution: Fixed
Fix Version/s: 1.1.4
   1.2.0

master: e4807621b8f41fc4f9fa69f423f1fbf7bba05218
release-1.1: 42334de84a85aa98432bde6dd817f5060e7ce036

> YARNSessionFIFOITCase.testDetachedMode failed on Travis
> ---
>
> Key: FLINK-3813
> URL: https://issues.apache.org/jira/browse/FLINK-3813
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.2.0, 1.1.4
>
>
> The {{YARNSessionFIFOITCase.testDetachedMode}} failed on Travis.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/125560038/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3813) YARNSessionFIFOITCase.testDetachedMode failed on Travis

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637371#comment-15637371
 ] 

ASF GitHub Bot commented on FLINK-3813:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2749


> YARNSessionFIFOITCase.testDetachedMode failed on Travis
> ---
>
> Key: FLINK-3813
> URL: https://issues.apache.org/jira/browse/FLINK-3813
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
>Priority: Critical
>  Labels: test-stability
>
> The {{YARNSessionFIFOITCase.testDetachedMode}} failed on Travis.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/125560038/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4998) ResourceManager fails when num task slots > Yarn vcores

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637370#comment-15637370
 ] 

ASF GitHub Bot commented on FLINK-4998:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2741


> ResourceManager fails when num task slots > Yarn vcores
> ---
>
> Key: FLINK-4998
> URL: https://issues.apache.org/jira/browse/FLINK-4998
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN Client
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.2.0
>
>
> The ResourceManager fails to acquire containers when the users configures the 
> number of task slots to be greater than the maximum number of virtual cores 
> of the Yarn cluster.
> We should check during deployment that the task slots are not configured to 
> be larger than the virtual cores.
> {noformat}
> 2016-11-02 14:39:01,948 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - FATAL ERROR IN YARN APPLICATION MASTER: Connection to YARN 
> Resource Manager failed
> org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid 
> resource request, requested virtual cores < 0, or requested virtual cores > 
> max configured, requestedVirtualCores=3, maxVirtualCores=1
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2741: [FLINK-4998][yarn] fail if too many task slots are...

2016-11-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2741


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2749: [FLINK-3813][yarn] wait for CLI to complete before...

2016-11-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2749


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5022) Suppress RejectedExecutionException when the Executor is shut down

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637302#comment-15637302
 ] 

ASF GitHub Bot commented on FLINK-5022:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2757
  
Looks good, +1


> Suppress RejectedExecutionException when the Executor is shut down
> --
>
> Key: FLINK-5022
> URL: https://issues.apache.org/jira/browse/FLINK-5022
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0
>
>
> The {{JobManager}} shuts down its {{ExecutionService}} when the actor is 
> stopped. This causes the problem that the execution service which is used to 
> run future callbacks is finished before the actor has completely terminated. 
> Due to this there might be rpc answers which cause another {{Runnable}} to be 
> submitted to the {{ExecutionService}}. This causes 
> {{RejectedExecutionExceptions}} to be thrown. 
> The underlying problem is that the {{JobManager}} should not stop the 
> {{ExecutionService}} before the actor has been terminated. This will be 
> changed in Flip-6 anyway. In the meantime to suppress these exceptions, it 
> will be checked whether the {{ExecutionService}} has been shut down and only 
> if not, exceptions will be logged.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2757: [FLINK-5022] Suppress RejectedExecutionExceptions if the ...

2016-11-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2757
  
Looks good, +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637246#comment-15637246
 ] 

Stephan Ewen commented on FLINK-5012:
-

I just wanted the simple way of outputting an element to be there.

If the {{Collector}} would be an {{OutputCollector}} (or so) with two methods: 
{{collect()}} and {{collectWithTimestamp()}}, would that work?

> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2759: [FLINK-5020] Make the GenericWriteAheadSink rescal...

2016-11-04 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2759

[FLINK-5020] Make the GenericWriteAheadSink rescalable.

As the issue suggests, this is the final commit to make the 
GenericWriteAheadSink re-scalable.
In essence, what it does is that it replaces the old snapshot()/restore() 
cycle with the new snapshotState()/initializeState() and adds tests to show 
that it works as expected.

R: @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink rescaling_wr_ahead

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2759.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2759


commit 1cdfff0f56a77616f36addfa1cb48b2efe2c9626
Author: kl0u 
Date:   2016-10-26T15:19:12Z

[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the 
committing subtask for a pending checkpoint

So far the GenericWriteAheadSink expected that
the subtask that wrote a temporary buffer to the
state backend, will be also the one to commit it to
the third-party storage system.

This commit removes this assumption. To do this
it changes the CheckpointCommitter to dynamically
take the subtaskIdx as a parameter when asking
if a checkpoint was committed and also changes the
state kept by the GenericWriteAheadSink to also
include that subtask index of the subtask that wrote
the pending buffer.

commit 77c3892687f78780c19710a95f8830907fe67c86
Author: kl0u 
Date:   2016-11-03T11:28:37Z

Integrated PR comments

commit c8627568c375cdd37ac0a8314fc5bf56077a578e
Author: kl0u 
Date:   2016-11-04T15:05:19Z

Removes redundant safety check.

commit 2b9c28059e84ba09068ed0350680730f77119253
Author: kl0u 
Date:   2016-11-03T20:46:58Z

[FLINK-5020] Make the GenericWriteAheadSink rescalable.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637222#comment-15637222
 ] 

ASF GitHub Bot commented on FLINK-5020:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2759

[FLINK-5020] Make the GenericWriteAheadSink rescalable.

As the issue suggests, this is the final commit to make the 
GenericWriteAheadSink re-scalable.
In essence, what it does is that it replaces the old snapshot()/restore() 
cycle with the new snapshotState()/initializeState() and adds tests to show 
that it works as expected.

R: @zentol 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink rescaling_wr_ahead

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2759.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2759


commit 1cdfff0f56a77616f36addfa1cb48b2efe2c9626
Author: kl0u 
Date:   2016-10-26T15:19:12Z

[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the 
committing subtask for a pending checkpoint

So far the GenericWriteAheadSink expected that
the subtask that wrote a temporary buffer to the
state backend, will be also the one to commit it to
the third-party storage system.

This commit removes this assumption. To do this
it changes the CheckpointCommitter to dynamically
take the subtaskIdx as a parameter when asking
if a checkpoint was committed and also changes the
state kept by the GenericWriteAheadSink to also
include that subtask index of the subtask that wrote
the pending buffer.

commit 77c3892687f78780c19710a95f8830907fe67c86
Author: kl0u 
Date:   2016-11-03T11:28:37Z

Integrated PR comments

commit c8627568c375cdd37ac0a8314fc5bf56077a578e
Author: kl0u 
Date:   2016-11-04T15:05:19Z

Removes redundant safety check.

commit 2b9c28059e84ba09068ed0350680730f77119253
Author: kl0u 
Date:   2016-11-03T20:46:58Z

[FLINK-5020] Make the GenericWriteAheadSink rescalable.




> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636993#comment-15636993
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
I've changed my code so that I have now mapping:DataSet[(String,Long)]

val mapping = input
  .mapWith( s => (s, 1) )
  .groupBy( 0 )
  .reduce( (a, b) => (a._1, a._2 + b._2) )
  .partitionByRange( 1 )
  .zipWithIndex
  .mapWith { case (id, (label, count)) => (label, id) }

Parsing a new DataSet[String] called rawInput, I'd like to use this mapping 
and associate each "label" of rawInput an ID (which is the Long value of 
mapping).

Is it possible with a streaming approach (need a join for example) ? 



> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-04 Thread tfournier314
Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
I've changed my code so that I have now mapping:DataSet[(String,Long)]

val mapping = input
  .mapWith( s => (s, 1) )
  .groupBy( 0 )
  .reduce( (a, b) => (a._1, a._2 + b._2) )
  .partitionByRange( 1 )
  .zipWithIndex
  .mapWith { case (id, (label, count)) => (label, id) }

Parsing a new DataSet[String] called rawInput, I'd like to use this mapping 
and associate each "label" of rawInput an ID (which is the Long value of 
mapping).

Is it possible with a streaming approach (need a join for example) ? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3395) Polishing the web UI

2016-11-04 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-3395.
-
Resolution: Later

> Polishing the web UI
> 
>
> Key: FLINK-3395
> URL: https://issues.apache.org/jira/browse/FLINK-3395
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> On the job properties page one must select an operator from the plan. 
> Elsewhere in the UI a list of operators is displayed and clicking the table 
> or the plan will reveal the requested information.
> A list of operators could likewise be added to the timeline page.
> The job exceptions page should display a "No exceptions" notification as done 
> elsewhere for when there is nothing to display.
> The job plan is not redrawn when the browser window is resized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3767) Show timeline also for running tasks, not only for finished ones

2016-11-04 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636907#comment-15636907
 ] 

Greg Hogan commented on FLINK-3767:
---

I'm running 1.2-SNAPSHOT and this looks to be implemented. [~rmetzger] can you 
verify?

> Show timeline also for running tasks, not only for finished ones
> 
>
> Key: FLINK-3767
> URL: https://issues.apache.org/jira/browse/FLINK-3767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: Robert Metzger
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2758: support specifying an ESCAPE character in LIKE and SIMILA...

2016-11-04 Thread miaoever
Github user miaoever commented on the issue:

https://github.com/apache/flink/pull/2758
  
@twalthr @wuchong Would you please review my pr to flink, thx :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4260) Allow SQL's LIKE ESCAPE

2016-11-04 Thread Leo Deng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leo Deng updated FLINK-4260:

External issue URL: https://github.com/apache/flink/pull/2758

> Allow SQL's LIKE ESCAPE
> ---
>
> Key: FLINK-4260
> URL: https://issues.apache.org/jira/browse/FLINK-4260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Leo Deng
>Priority: Minor
>
> Currently, the SQL API does not support specifying an ESCAPE character in a 
> LIKE expression. The SIMILAR TO should also support that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2758: support specifying an ESCAPE character in LIKE and...

2016-11-04 Thread miaoever
GitHub user miaoever opened a pull request:

https://github.com/apache/flink/pull/2758

support specifying an ESCAPE character in LIKE and SIMILAR TO express…

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

…ions.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/miaoever/flink FLINK_4260

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2758


commit 86fab780909334c0696e6384fe7c30bd8297ca7b
Author: miaoever 
Date:   2016-11-04T15:31:00Z

support specifying an ESCAPE character in LIKE and SIMILAR TO expressions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-5017) Introduce WatermarkStatus stream element to allow for temporarily idle streaming sources

2016-11-04 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636743#comment-15636743
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5017 at 11/4/16 3:48 PM:
-

I had considered this, by defining values "-1" and "-2" as special timestamp 
values that represent idle / active watermark status.
However, I wasn't quite sure that occupying "-1"and "-2" was a good idea,  or 
whether or not it'll interfere other parts of the system / effect any future 
extensions. Otherwise, considering only the purpose at hand stated in this 
ticket, it should be possible with a bit of finesse, the only concern might be 
that it'll end up in bad code readability (we basically need to check if 
watermarks are actually these two special values in every place where we work 
with watermarks).

I think a separate {{WatermarkStatus}} class has another readability advantage: 
we can simply keep a {{currentWatermarkStatus}} variable in operators that is 
set to the last emitted {{WatermarkStatus}}, which naturally reflects whether 
the operator is currently watermark idle or active.

Either way, let me know what you think :) I'm not against extending 
{{Watermark}}, so I'm still open to going for that approach if you think the 
new methods are excessive.


was (Author: tzulitai):
I had considered this, by defining values "-1" and "-2" as special timestamp 
values that represent idle / active watermark status.
However, I wasn't quite sure that occupying "-1"and "-2" was a good idea,  or 
whether or not it'll interfere other parts of the system / effect any future 
extensions. Otherwise, considering only the purpose at hand stated here, it 
should be possible with a bit of finesse, the only concern might be that it'll 
end up in bad code readability (we basically need to check if watermarks are 
actually these two special values in every place where we work with watermarks).

I think a separate {{WatermarkStatus}} class has another readability advantage 
that we can simply keep a {{currentWatermarkStatus}} variable in operators that 
is set to the last emitted {{WatermarkStatus}}, which naturally reflects 
whether the operator is currently watermark idle or active.

Either way, let me know what you think :) I'm not against extending 
{{Watermark}}, so I'm still open to going for that approach if you think the 
new methods are excessive.

> Introduce WatermarkStatus stream element to allow for temporarily idle 
> streaming sources
> 
>
> Key: FLINK-5017
> URL: https://issues.apache.org/jira/browse/FLINK-5017
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0
>
>
> A {{WatermarkStatus}} element informs receiving operators whether or not they 
> should continue to expect watermarks from the sending operator. There are 2 
> kinds of status, namely {{IDLE}} and {{ACTIVE}}. Watermark status elements 
> are generated at the sources, and may be propagated through the operators of 
> the topology using {{Output#emitWatermarkStatus(WatermarkStatus)}}.
> Sources and downstream operators should emit either of the status elements 
> once it changes between "watermark-idle" and "watermark-active" states.
> A source is considered "watermark-idle" if it will not emit records for an 
> indefinite amount of time. This is the case, for example, for Flink's Kafka 
> Consumer, where sources might initially have no assigned partitions to read 
> from, or no records can be read from the assigned partitions. Once the source 
> detects that it will resume emitting data, it is considered 
> "watermark-active".
> Downstream operators with multiple inputs (ex. head operators of a 
> {{OneInputStreamTask}} or {{TwoInputStreamTask}}) should not wait for 
> watermarks from an upstream operator that is "watermark-idle" when deciding 
> whether or not to advance the operator's current watermark. When a downstream 
> operator determines that all upstream operators are "watermark-idle" (i.e. 
> when all input channels have received the watermark idle status element), 
> then the operator is considered to also be "watermark-idle", as it will 
> temporarily be unable to advance its own watermark. This is always the case 
> for operators that only read from a single upstream operator. Once an 
> operator is considered "watermark-idle", it should itself forward its idle 
> status to inform downstream operators. The operator is considered to be back 
> to "watermark-active" as soon as at least one of its upstream operators 
> resume to be "watermark-active" (i.e. when at least one input channel 
> receives the watermark active status 

[jira] [Commented] (FLINK-5017) Introduce WatermarkStatus stream element to allow for temporarily idle streaming sources

2016-11-04 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636743#comment-15636743
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5017:


I had considered this, by defining values "-1" and "-2" as special timestamp 
values that represent idle / active watermark status.
However, I wasn't quite sure that occupying "-1"and "-2" was a good idea,  or 
whether or not it'll interfere other parts of the system / effect any future 
extensions. Otherwise, considering only the purpose at hand stated here, it 
should be possible with a bit of finesse, the only concern might be that it'll 
end up in bad code readability (we basically need to check if watermarks are 
actually these two special values in every place where we work with them).

I think a separate {{WatermarkStatus}} class has another readability advantage 
that we can simply keep a {{currentWatermarkStatus}} variable in operators that 
is set to the last emitted {{WatermarkStatus}}, which naturally reflects 
whether the operator is currently watermark idle or active.

Either way, let me know what you think :) I'm not against extending 
{{Watermark}}, so I'm still open to going for that approach if you think the 
new methods are excessive.

> Introduce WatermarkStatus stream element to allow for temporarily idle 
> streaming sources
> 
>
> Key: FLINK-5017
> URL: https://issues.apache.org/jira/browse/FLINK-5017
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0
>
>
> A {{WatermarkStatus}} element informs receiving operators whether or not they 
> should continue to expect watermarks from the sending operator. There are 2 
> kinds of status, namely {{IDLE}} and {{ACTIVE}}. Watermark status elements 
> are generated at the sources, and may be propagated through the operators of 
> the topology using {{Output#emitWatermarkStatus(WatermarkStatus)}}.
> Sources and downstream operators should emit either of the status elements 
> once it changes between "watermark-idle" and "watermark-active" states.
> A source is considered "watermark-idle" if it will not emit records for an 
> indefinite amount of time. This is the case, for example, for Flink's Kafka 
> Consumer, where sources might initially have no assigned partitions to read 
> from, or no records can be read from the assigned partitions. Once the source 
> detects that it will resume emitting data, it is considered 
> "watermark-active".
> Downstream operators with multiple inputs (ex. head operators of a 
> {{OneInputStreamTask}} or {{TwoInputStreamTask}}) should not wait for 
> watermarks from an upstream operator that is "watermark-idle" when deciding 
> whether or not to advance the operator's current watermark. When a downstream 
> operator determines that all upstream operators are "watermark-idle" (i.e. 
> when all input channels have received the watermark idle status element), 
> then the operator is considered to also be "watermark-idle", as it will 
> temporarily be unable to advance its own watermark. This is always the case 
> for operators that only read from a single upstream operator. Once an 
> operator is considered "watermark-idle", it should itself forward its idle 
> status to inform downstream operators. The operator is considered to be back 
> to "watermark-active" as soon as at least one of its upstream operators 
> resume to be "watermark-active" (i.e. when at least one input channel 
> receives the watermark active status element), and should also forward its 
> active status to inform downstream operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2757: [FLINK-5022] Suppress RejectedExecutionExceptions ...

2016-11-04 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2757

[FLINK-5022] Suppress RejectedExecutionExceptions if the ExecutorService 
has been shut down

This PR suppresses occurring RejectedExecutionExceptions if an 
ExecutorService has been shut
down. This only works for ExecutorServices at the moment. All other 
exceptions are logged.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
fixRejectedExecutionException

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2757.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2757


commit e400578da826cb5b2d7cb0b4b0518df1b2284248
Author: Till Rohrmann 
Date:   2016-11-04T10:16:47Z

[FLINK-5022] Suppress RejectedExecutionExceptions if the ExecutorService 
has been shut down

This PR suppresses occurring RejectedExecutionExceptions if an 
ExecutorService has been shut
down. This only works for ExecutorServices at the moment. All other 
exceptions are logged.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5021) Make the ContinuousFileReaderOperator rescalable.

2016-11-04 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-5021:
-

 Summary: Make the ContinuousFileReaderOperator rescalable.
 Key: FLINK-5021
 URL: https://issues.apache.org/jira/browse/FLINK-5021
 Project: Flink
  Issue Type: Improvement
  Components: filesystem-connector
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.2.0


This targets integrating the ContinuousFileReaderOperator with the new 
rescalable state abstractions, so that the operator can change parallelism 
without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5022) Suppress RejectedExecutionException when the Executor is shut down

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636693#comment-15636693
 ] 

ASF GitHub Bot commented on FLINK-5022:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2757

[FLINK-5022] Suppress RejectedExecutionExceptions if the ExecutorService 
has been shut down

This PR suppresses occurring RejectedExecutionExceptions if an 
ExecutorService has been shut
down. This only works for ExecutorServices at the moment. All other 
exceptions are logged.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
fixRejectedExecutionException

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2757.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2757


commit e400578da826cb5b2d7cb0b4b0518df1b2284248
Author: Till Rohrmann 
Date:   2016-11-04T10:16:47Z

[FLINK-5022] Suppress RejectedExecutionExceptions if the ExecutorService 
has been shut down

This PR suppresses occurring RejectedExecutionExceptions if an 
ExecutorService has been shut
down. This only works for ExecutorServices at the moment. All other 
exceptions are logged.




> Suppress RejectedExecutionException when the Executor is shut down
> --
>
> Key: FLINK-5022
> URL: https://issues.apache.org/jira/browse/FLINK-5022
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0
>
>
> The {{JobManager}} shuts down its {{ExecutionService}} when the actor is 
> stopped. This causes the problem that the execution service which is used to 
> run future callbacks is finished before the actor has completely terminated. 
> Due to this there might be rpc answers which cause another {{Runnable}} to be 
> submitted to the {{ExecutionService}}. This causes 
> {{RejectedExecutionExceptions}} to be thrown. 
> The underlying problem is that the {{JobManager}} should not stop the 
> {{ExecutionService}} before the actor has been terminated. This will be 
> changed in Flip-6 anyway. In the meantime to suppress these exceptions, it 
> will be checked whether the {{ExecutionService}} has been shut down and only 
> if not, exceptions will be logged.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5022) Suppress RejectedExecutionException when the Executor is shut down

2016-11-04 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5022:


 Summary: Suppress RejectedExecutionException when the Executor is 
shut down
 Key: FLINK-5022
 URL: https://issues.apache.org/jira/browse/FLINK-5022
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor
 Fix For: 1.2.0


The {{JobManager}} shuts down its {{ExecutionService}} when the actor is 
stopped. This causes the problem that the execution service which is used to 
run future callbacks is finished before the actor has completely terminated. 
Due to this there might be rpc answers which cause another {{Runnable}} to be 
submitted to the {{ExecutionService}}. This causes 
{{RejectedExecutionExceptions}} to be thrown. 

The underlying problem is that the {{JobManager}} should not stop the 
{{ExecutionService}} before the actor has been terminated. This will be changed 
in Flip-6 anyway. In the meantime to suppress these exceptions, it will be 
checked whether the {{ExecutionService}} has been shut down and only if not, 
exceptions will be logged.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636684#comment-15636684
 ] 

ASF GitHub Bot commented on FLINK-4939:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2707
  
Looks good, +1 from my side.


> GenericWriteAheadSink: Decouple the creating from the committing subtask for 
> a pending checkpoint
> -
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the 
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this 
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking 
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also 
> include that subtask index of the subtask that wrote 
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-04 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-5020:
--
Component/s: Cassandra Connector

> Make the GenericWriteAheadSink rescalable.
> --
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable 
> state abstractions so that the parallelism of the operator can change 
> arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2707: [FLINK-4939] GenericWriteAheadSink: Decouple the creating...

2016-11-04 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2707
  
Looks good, +1 from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4997) Extending Window Function Metadata

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636673#comment-15636673
 ] 

ASF GitHub Bot commented on FLINK-4997:
---

GitHub user VenturaDelMonte opened a pull request:

https://github.com/apache/flink/pull/2756

[FLINK-4997] Extending Window Function Metadata

This PR aims to introduce what discussed in 
[FLIP-2](https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata).
 
WindowedStream apply methods have been overloaded in order to support 
ProcessWindowFunction (and its rich counterpart).
Streaming runtime internals have been modified in order to support the new 
function, however fully backward compatibility to WindowFunction (and its rich 
counterpart) is guaranteed by silently wrapping it with a ProcessWindowFunction.
This PR implementation strictly follow what decided in the FLIP, nothing 
has been changed for AllWindowedStream.
Windows documentation has been overhauled in order to illustrate 
ProcessWindowFunction API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/VenturaDelMonte/flink flip-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2756.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2756


commit 5a33fe2b9e5c23e8529964a489465f51410432df
Author: Ventura Del Monte 
Date:   2016-09-21T14:04:00Z

Merge remote-tracking branch 'apache/master'

commit 39a8bb9433edf5bb9adb850c667b71ef8d25c6b6
Author: Ventura Del Monte 
Date:   2016-09-22T08:31:58Z

Merge remote-tracking branch 'upstream/master'

commit 2507a71e2a40b05b3e5c7507a1c32d6678e07810
Author: Ventura Del Monte 
Date:   2016-09-22T09:40:40Z

Merge remote-tracking branch 'upstream/master'

commit 57d0bca5681e5ea0ba63f3b95fe4f949af3734de
Author: Ventura Del Monte 
Date:   2016-10-04T15:03:55Z

Merge remote-tracking branch 'upstream/master'

commit 9f55a1e3e56b48d9dc5a4d1b3109b41e1c89ce5d
Author: Ventura Del Monte 
Date:   2016-11-02T09:14:52Z

Merge remote-tracking branch 'upstream/master'

commit 9a71d092ad06c3592355ad11dfb7bd4b982ded9f
Author: Ventura Del Monte 
Date:   2016-11-03T20:44:56Z

[FLINK-4997] [streaming] Extending window function metadata introducing 
ProcessWindowFunction

commit 878983074d364ea0d340bd3497343f286d28e3db
Author: Ventura Del Monte 
Date:   2016-11-04T14:29:52Z

[FLINK-4997] [docs] improved windows documentation explaining the new 
ProcessWindowFunction API




> Extending Window Function Metadata
> --
>
> Key: FLINK-4997
> URL: https://issues.apache.org/jira/browse/FLINK-4997
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Streaming, Windowing Operators
>Reporter: Ventura Del Monte
>Assignee: Ventura Del Monte
> Fix For: 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2756: [FLINK-4997] Extending Window Function Metadata

2016-11-04 Thread VenturaDelMonte
GitHub user VenturaDelMonte opened a pull request:

https://github.com/apache/flink/pull/2756

[FLINK-4997] Extending Window Function Metadata

This PR aims to introduce what discussed in 
[FLIP-2](https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata).
 
WindowedStream apply methods have been overloaded in order to support 
ProcessWindowFunction (and its rich counterpart).
Streaming runtime internals have been modified in order to support the new 
function, however fully backward compatibility to WindowFunction (and its rich 
counterpart) is guaranteed by silently wrapping it with a ProcessWindowFunction.
This PR implementation strictly follow what decided in the FLIP, nothing 
has been changed for AllWindowedStream.
Windows documentation has been overhauled in order to illustrate 
ProcessWindowFunction API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/VenturaDelMonte/flink flip-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2756.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2756


commit 5a33fe2b9e5c23e8529964a489465f51410432df
Author: Ventura Del Monte 
Date:   2016-09-21T14:04:00Z

Merge remote-tracking branch 'apache/master'

commit 39a8bb9433edf5bb9adb850c667b71ef8d25c6b6
Author: Ventura Del Monte 
Date:   2016-09-22T08:31:58Z

Merge remote-tracking branch 'upstream/master'

commit 2507a71e2a40b05b3e5c7507a1c32d6678e07810
Author: Ventura Del Monte 
Date:   2016-09-22T09:40:40Z

Merge remote-tracking branch 'upstream/master'

commit 57d0bca5681e5ea0ba63f3b95fe4f949af3734de
Author: Ventura Del Monte 
Date:   2016-10-04T15:03:55Z

Merge remote-tracking branch 'upstream/master'

commit 9f55a1e3e56b48d9dc5a4d1b3109b41e1c89ce5d
Author: Ventura Del Monte 
Date:   2016-11-02T09:14:52Z

Merge remote-tracking branch 'upstream/master'

commit 9a71d092ad06c3592355ad11dfb7bd4b982ded9f
Author: Ventura Del Monte 
Date:   2016-11-03T20:44:56Z

[FLINK-4997] [streaming] Extending window function metadata introducing 
ProcessWindowFunction

commit 878983074d364ea0d340bd3497343f286d28e3db
Author: Ventura Del Monte 
Date:   2016-11-04T14:29:52Z

[FLINK-4997] [docs] improved windows documentation explaining the new 
ProcessWindowFunction API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4800) Introduce the TimestampedFileInputSplit for Continuous File Processing

2016-11-04 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-4800.
-
Resolution: Fixed

> Introduce the TimestampedFileInputSplit for Continuous File Processing
> --
>
> Key: FLINK-4800
> URL: https://issues.apache.org/jira/browse/FLINK-4800
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Minor
>
> Introduce the TimestampedFileInputSplit which extends the class 
> FileInputSplit and also contains:
> i) the modification time of the file the split belongs to and also, and
> ii) when checkpointing, the point the reader is currently reading from.
> The latter will be useful for rescaling. With this addition, the 
> ContinuousFileMonitoringFunction sends TimestampedFileInputSplits 
> to the Readers, and the Readers' state now contain only 
> TimestampedFileInputSplits.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86540888
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
--- End diff --

remove "with EvaluableFunction" if EvaluableFunction's methods had move to 
UserDefinedFunctionUtils.scala


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636644#comment-15636644
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86542265
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -135,6 +138,32 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Returns eval method matching the given signature of 
[[TypeInformation]].
+*/
+  def getEvalMethod(
+function: EvaluableFunction,
+signature: Seq[TypeInformation[_]])
+  : Option[Method] = {
+// We compare the raw Java classes not the TypeInformation.
+// TypeInformation does not matter during runtime (e.g. within a 
MapFunction).
+val actualSignature = typeInfoToClass(signature)
+
+function
+  .getEvalMethods
--- End diff --

get evelMethods by 
UserDefinedFunctionUtils.checkAndExtractEvalMethods(clazz) 


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636646#comment-15636646
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86542743
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
+* [[TypeExtractor]] as default return type inference.
+*/
+  def getResultType(
+tableFunction: TableFunction[_],
+signature: Array[Class[_]])
+  : TypeInformation[_] = {
+// find method for signature
+val evalMethod = tableFunction.getEvalMethods
+  .find(m => signature.sameElements(m.getParameterTypes))
+  .getOrElse(throw new ValidationException("Given signature is 
invalid."))
+
+val userDefinedTypeInfo = tableFunction.getResultType
+if (userDefinedTypeInfo != null) {
+  userDefinedTypeInfo
+} else {
+  try {
+TypeExtractor.getForClass(evalMethod.getReturnType)
+  } catch {
+case ite: InvalidTypesException =>
+  throw new ValidationException(
+s"Return type of table function '$this' cannot be " +
+  s"automatically determined. Please provide type information 
manually.")
+  }
+}
+  }
+
+  /**
 * Returns the return type of the evaluation method matching the given 
signature.
 */
   def getResultTypeClass(
-  scalarFunction: ScalarFunction,
+  function: EvaluableFunction,
--- End diff --

chanage "function: EvaluableFunction" to "clazz: Class[_]" 


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636650#comment-15636650
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86557493
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
+  }
+
+  /**
+* Registers a [[TableFunction]] under a unique name. Replaces already 
existing
+* user-defined functions under this name.
+*/
+  private[flink] def registerTableFunctionInternal[T: TypeInformation](
+name: String, tf: TableFunction[T]): Unit = {
 
-  case _ =>
-throw new TableException("Unsupported user-defined function type.")
+val typeInfo: TypeInformation[_] = if (tf.getResultType != null) {
+  tf.getResultType
+} else {
+  implicitly[TypeInformation[T]]
 }
+
+val (fieldNames, fieldIndexes) = 
UserDefinedFunctionUtils.getFieldInfo(typeInfo)
--- End diff --

For the sake of simplicity, is it possible to encapsulate a 
”createSqlFunctions“ method in tablefunciton? 


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636649#comment-15636649
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86558899
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
+
+  private val rows: ListBuffer[T] = new ListBuffer
--- End diff --

User do not perceive the existence of the buff, whether there is the risk 
of memory overflow?  can we let user perceive the existence of this buff? 


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class 

[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636641#comment-15636641
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86541991
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -135,6 +138,32 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Returns eval method matching the given signature of 
[[TypeInformation]].
+*/
+  def getEvalMethod(
+function: EvaluableFunction,
--- End diff --

chanage "function: EvaluableFunction" to "clazz:Class[_]" 


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86557322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
+  }
+
+  /**
+* Registers a [[TableFunction]] under a unique name. Replaces already 
existing
+* user-defined functions under this name.
+*/
+  private[flink] def registerTableFunctionInternal[T: TypeInformation](
+name: String, tf: TableFunction[T]): Unit = {
 
-  case _ =>
-throw new TableException("Unsupported user-defined function type.")
+val typeInfo: TypeInformation[_] = if (tf.getResultType != null) {
+  tf.getResultType
+} else {
+  implicitly[TypeInformation[T]]
 }
+
+val (fieldNames, fieldIndexes) = 
UserDefinedFunctionUtils.getFieldInfo(typeInfo)
--- End diff --

For the sake of simplicity, is it possible to encapsulate a 
”createSqlFunctions“ method in tablefunciton? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86540020
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/EvaluableFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
--- End diff --

Whether you can remove the EvaluableFunction.scala, and then move 
getEvalMethods method and getSignatures method to 
UserDefinedFunctionUtils.scala, for example:
1. change checkAndExtractEvalMethods() to def 
checkAndExtractEvalMethods(clazz: Class[_]): Array[Method] = {...}
2. change getSignatures: Array[Array[Class[_]]] to getSignatures(clazz: 
Class[_]): Array[Array[Class[_]]] = {...}



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86558899
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
+
+  private val rows: ListBuffer[T] = new ListBuffer
--- End diff --

User do not perceive the existence of the buff, whether there is the risk 
of memory overflow?  can we let user perceive the existence of this buff? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86543527
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
+* [[TypeExtractor]] as default return type inference.
+*/
+  def getResultType(
+tableFunction: TableFunction[_],
+signature: Array[Class[_]])
+  : TypeInformation[_] = {
+// find method for signature
+val evalMethod = tableFunction.getEvalMethods
+  .find(m => signature.sameElements(m.getParameterTypes))
+  .getOrElse(throw new ValidationException("Given signature is 
invalid."))
+
+val userDefinedTypeInfo = tableFunction.getResultType
+if (userDefinedTypeInfo != null) {
+  userDefinedTypeInfo
+} else {
+  try {
+TypeExtractor.getForClass(evalMethod.getReturnType)
+  } catch {
+case ite: InvalidTypesException =>
+  throw new ValidationException(
+s"Return type of table function '$this' cannot be " +
+  s"automatically determined. Please provide type information 
manually.")
+  }
+}
+  }
+
+  /**
 * Returns the return type of the evaluation method matching the given 
signature.
 */
   def getResultTypeClass(
-  scalarFunction: ScalarFunction,
+  function: EvaluableFunction,
   signature: Array[Class[_]])
 : Class[_] = {
 // find method for signature
-val evalMethod = scalarFunction.getEvalMethods
+val evalMethod = function.getEvalMethods
   .find(m => signature.sameElements(m.getParameterTypes))
   .getOrElse(throw new IllegalArgumentException("Given signature is 
invalid."))
 evalMethod.getReturnType
   }
 
   /**
-* Prints all signatures of a [[ScalarFunction]].
+* Prints all signatures of a [[EvaluableFunction]].
 */
-  def signaturesToString(scalarFunction: ScalarFunction): String = {
-scalarFunction.getSignatures.map(signatureToString).mkString(", ")
+  def signaturesToString(function: EvaluableFunction): String = {
--- End diff --

chanage it to :
def signatureToString(clazz: Class[_]): String = 
getSignatures(clazz).map(signatureToString).mkString(", ")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86556498
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
+
+  private val rows: ListBuffer[T] = new ListBuffer
+
+  /**
+* Emit an output row
+*
+* @param row the output row
+*/
+  protected def collect(row: T): Unit = {
+// cache rows for now, maybe immediately process them further
+rows += row
+  }
+
+
+  @Internal
+  def getRowsIterator = rows.toIterator
+
+  @Internal
+  def clear() = rows.clear()
+
+  // this method will not be called, because we need to register multiple 
sql function at one time
+  override private[flink] final def createSqlFunction(
+  name: String,
+  typeFactory: FlinkTypeFactory)
+: SqlFunction = {
+null
--- End diff --

To be consistent with scalarfunciton, you can implement the method here. 
And  called it when registration.
it will make the registration method is very simple.
if you like you also defined the "createSqlFunctions" method used for 
registration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636643#comment-15636643
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86543527
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -162,24 +191,107 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Internal method of [[ScalarFunction#getResultType()]] that does some 
pre-checking and uses
+* [[TypeExtractor]] as default return type inference.
+*/
+  def getResultType(
+tableFunction: TableFunction[_],
+signature: Array[Class[_]])
+  : TypeInformation[_] = {
+// find method for signature
+val evalMethod = tableFunction.getEvalMethods
+  .find(m => signature.sameElements(m.getParameterTypes))
+  .getOrElse(throw new ValidationException("Given signature is 
invalid."))
+
+val userDefinedTypeInfo = tableFunction.getResultType
+if (userDefinedTypeInfo != null) {
+  userDefinedTypeInfo
+} else {
+  try {
+TypeExtractor.getForClass(evalMethod.getReturnType)
+  } catch {
+case ite: InvalidTypesException =>
+  throw new ValidationException(
+s"Return type of table function '$this' cannot be " +
+  s"automatically determined. Please provide type information 
manually.")
+  }
+}
+  }
+
+  /**
 * Returns the return type of the evaluation method matching the given 
signature.
 */
   def getResultTypeClass(
-  scalarFunction: ScalarFunction,
+  function: EvaluableFunction,
   signature: Array[Class[_]])
 : Class[_] = {
 // find method for signature
-val evalMethod = scalarFunction.getEvalMethods
+val evalMethod = function.getEvalMethods
   .find(m => signature.sameElements(m.getParameterTypes))
   .getOrElse(throw new IllegalArgumentException("Given signature is 
invalid."))
 evalMethod.getReturnType
   }
 
   /**
-* Prints all signatures of a [[ScalarFunction]].
+* Prints all signatures of a [[EvaluableFunction]].
 */
-  def signaturesToString(scalarFunction: ScalarFunction): String = {
-scalarFunction.getSignatures.map(signatureToString).mkString(", ")
+  def signaturesToString(function: EvaluableFunction): String = {
--- End diff --

chanage it to :
def signatureToString(clazz: Class[_]): String = 
getSignatures(clazz).map(signatureToString).mkString(", ")


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86560516
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/UserDefinedTableFunctionITCase.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.functions.TableFunction;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+
+@RunWith(Parameterized.class)
+public class UserDefinedTableFunctionITCase extends TableProgramsTestBase {
+
+   public UserDefinedTableFunctionITCase(TestExecutionMode mode, 
TableConfigMode configMode){
+   super(mode, configMode);
+   }
+
+
+   @Test
+   public void testUDTF() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataSet> ds1 =
+   CollectionDataSets.getSmall5TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(ds1, "a, b, c, d, e");
+
+   tableEnv.registerFunction("stack", new TableFunc0());
+
+   Table result = table.crossApply("stack(a,c) as (f)")
+   .select("b,f");
+
+   // with overloading
+   DataSet ds = tableEnv.toDataSet(result, Row.class);
+   List results = ds.collect();
+   String expected = "1,1\n" + "1,0\n" + "2,2\n" + "2,1\n" + 
"3,2\n" + "3,2\n";
+   compareResultAsText(results, expected);
+
+   Table result2 = table.crossApply("stack(a,c,e) as (f)")
+   .select("b,f");
+
+   DataSet ds2 = tableEnv.toDataSet(result2, Row.class);
+   List results2 = ds2.collect();
+   String expected2 = "1,1\n" + "1,1\n" + "1,0\n" + "2,2\n" + 
"2,2\n" + "2,1\n" +
+  "3,1\n" + "3,2\n" + "3,2\n";
+   compareResultAsText(results2, expected2);
+   }
+
+   @Test
+   public void testUDTFWithOuterApply() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataSet> ds1 =
+   CollectionDataSets.getSmall5TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(ds1, "a, b, c, d, e");
+
+   tableEnv.registerFunction("func1", new TableFunc1());
+
+   Table result = table.crossApply("func1(d) as (s,l)")
+   .select("d,s,l");
+
+   DataSet ds = tableEnv.toDataSet(result, Row.class);
+   List results = ds.collect();
+   String expected = "Hallo Welt,Welt,4\n" + "Hallo Welt 
wie,Welt,4\n" +
+ "Hallo Welt wie,wie,3\n";
+   compareResultAsText(results, expected);
+
+
+   Table result2 = table.outerApply("func1(d) as (s,l)")
+   .select("d,s,l");
+
+   DataSet ds2 = tableEnv.toDataSet(result2, Row.class);
+ 

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86557493
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
+  }
+
+  /**
+* Registers a [[TableFunction]] under a unique name. Replaces already 
existing
+* user-defined functions under this name.
+*/
+  private[flink] def registerTableFunctionInternal[T: TypeInformation](
+name: String, tf: TableFunction[T]): Unit = {
 
-  case _ =>
-throw new TableException("Unsupported user-defined function type.")
+val typeInfo: TypeInformation[_] = if (tf.getResultType != null) {
+  tf.getResultType
+} else {
+  implicitly[TypeInformation[T]]
 }
+
+val (fieldNames, fieldIndexes) = 
UserDefinedFunctionUtils.getFieldInfo(typeInfo)
--- End diff --

For the sake of simplicity, is it possible to encapsulate a 
”createSqlFunctions“ method in tablefunciton? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636645#comment-15636645
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86540888
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.functions
+
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
+  * one row as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
+  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by 
Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
+  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
+  * If a user-defined table function should not introduce much overhead 
during runtime, it is
+  * recommended to declare parameters and result types as primitive types 
instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction with 
EvaluableFunction {
--- End diff --

remove "with EvaluableFunction" if EvaluableFunction's methods had move to 
UserDefinedFunctionUtils.scala


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> 

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86540705
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
 ---
@@ -48,7 +48,7 @@ import org.apache.flink.api.table.{FlinkTypeFactory, 
ValidationException}
   * recommended to declare parameters and result types as primitive types 
instead of their boxed
   * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
   */
-abstract class ScalarFunction extends UserDefinedFunction {
+abstract class ScalarFunction extends UserDefinedFunction with 
EvaluableFunction {
--- End diff --

remove  "with EvaluableFunction" if EvaluableFunction's methods had move to 
UserDefinedFunctionUtils.scala


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636639#comment-15636639
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2425
  
The Netty logic needs some improvements:

  - The cookie is added to every single message/buffer that is transferred. 
That is too much - securing the integrity of the stream is responsibility of 
the encryption layer. The cookie should be added to requests messages that 
establish connections only.

  - Charset lookups and cookie to bytes encoding happens for every buffer, 
rather than once in an initialization step.

  - The String to byte conversion is not consistent. Sometimes it uses the 
default platform encoding, sometimes "UTF-8". 


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636640#comment-15636640
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86540020
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/EvaluableFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
--- End diff --

Whether you can remove the EvaluableFunction.scala, and then move 
getEvalMethods method and getSignatures method to 
UserDefinedFunctionUtils.scala, for example:
1. change checkAndExtractEvalMethods() to def 
checkAndExtractEvalMethods(clazz: Class[_]): Array[Method] = {...}
2. change getSignatures: Array[Array[Class[_]]] to getSignatures(clazz: 
Class[_]): Array[Array[Class[_]]] = {...}



> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636651#comment-15636651
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86557322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,21 +153,40 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   protected def getBuiltInRuleSet: RuleSet
 
   /**
-* Registers a [[UserDefinedFunction]] under a unique name. Replaces 
already existing
+* Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
 * user-defined functions under this name.
 */
-  def registerFunction(name: String, function: UserDefinedFunction): Unit 
= {
-function match {
-  case sf: ScalarFunction =>
-// register in Table API
-functionCatalog.registerFunction(name, function.getClass)
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+// register in Table API
+functionCatalog.registerFunction(name, function.getClass)
 
-// register in SQL API
-functionCatalog.registerSqlFunction(sf.getSqlFunction(name, 
typeFactory))
+// register in SQL API
+functionCatalog.registerSqlFunction(function.getSqlFunction(name, 
typeFactory))
+  }
+
+  /**
+* Registers a [[TableFunction]] under a unique name. Replaces already 
existing
+* user-defined functions under this name.
+*/
+  private[flink] def registerTableFunctionInternal[T: TypeInformation](
+name: String, tf: TableFunction[T]): Unit = {
 
-  case _ =>
-throw new TableException("Unsupported user-defined function type.")
+val typeInfo: TypeInformation[_] = if (tf.getResultType != null) {
+  tf.getResultType
+} else {
+  implicitly[TypeInformation[T]]
 }
+
+val (fieldNames, fieldIndexes) = 
UserDefinedFunctionUtils.getFieldInfo(typeInfo)
--- End diff --

For the sake of simplicity, is it possible to encapsulate a 
”createSqlFunctions“ method in tablefunciton? 


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636647#comment-15636647
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86560516
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/UserDefinedTableFunctionITCase.java
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.functions.TableFunction;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+
+@RunWith(Parameterized.class)
+public class UserDefinedTableFunctionITCase extends TableProgramsTestBase {
+
+   public UserDefinedTableFunctionITCase(TestExecutionMode mode, 
TableConfigMode configMode){
+   super(mode, configMode);
+   }
+
+
+   @Test
+   public void testUDTF() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataSet> ds1 =
+   CollectionDataSets.getSmall5TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(ds1, "a, b, c, d, e");
+
+   tableEnv.registerFunction("stack", new TableFunc0());
+
+   Table result = table.crossApply("stack(a,c) as (f)")
+   .select("b,f");
+
+   // with overloading
+   DataSet ds = tableEnv.toDataSet(result, Row.class);
+   List results = ds.collect();
+   String expected = "1,1\n" + "1,0\n" + "2,2\n" + "2,1\n" + 
"3,2\n" + "3,2\n";
+   compareResultAsText(results, expected);
+
+   Table result2 = table.crossApply("stack(a,c,e) as (f)")
+   .select("b,f");
+
+   DataSet ds2 = tableEnv.toDataSet(result2, Row.class);
+   List results2 = ds2.collect();
+   String expected2 = "1,1\n" + "1,1\n" + "1,0\n" + "2,2\n" + 
"2,2\n" + "2,1\n" +
+  "3,1\n" + "3,2\n" + "3,2\n";
+   compareResultAsText(results2, expected2);
+   }
+
+   @Test
+   public void testUDTFWithOuterApply() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataSet> ds1 =
+   CollectionDataSets.getSmall5TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(ds1, "a, b, c, d, e");
+
+   tableEnv.registerFunction("func1", new TableFunc1());
+
+   Table result = table.crossApply("func1(d) as (s,l)")
+   .select("d,s,l");
+
+   DataSet ds = tableEnv.toDataSet(result, Row.class);
+   List results = ds.collect();
+   String expected = "Hallo Welt,Welt,4\n" + "Hallo Welt 
wie,Welt,4\n" +
+ "Hallo Welt wie,wie,3\n";
+   

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-11-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2653#discussion_r86542265
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -135,6 +138,32 @@ object UserDefinedFunctionUtils {
   }
 
   /**
+* Returns eval method matching the given signature of 
[[TypeInformation]].
+*/
+  def getEvalMethod(
+function: EvaluableFunction,
+signature: Seq[TypeInformation[_]])
+  : Option[Method] = {
+// We compare the raw Java classes not the TypeInformation.
+// TypeInformation does not matter during runtime (e.g. within a 
MapFunction).
+val actualSignature = typeInfoToClass(signature)
+
+function
+  .getEvalMethods
--- End diff --

get evelMethods by 
UserDefinedFunctionUtils.checkAndExtractEvalMethods(clazz) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-11-04 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r86564819
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -57,24 +61,37 @@
// constructor in order to work with the generic deserializer.
// 

 
-   static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic 
number (4), msg ID (1)
+   static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), 
magic number (4), Cookie (4), msg ID (1)
--- End diff --

Looks like this is the cookie length


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636613#comment-15636613
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r86564819
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -57,24 +61,37 @@
// constructor in order to work with the generic deserializer.
// 

 
-   static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic 
number (4), msg ID (1)
+   static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), 
magic number (4), Cookie (4), msg ID (1)
--- End diff --

Looks like this is the cookie length


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2707: [FLINK-4939] GenericWriteAheadSink: Decouple the creating...

2016-11-04 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2707
  
@zentol let me know if you have any more comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5020) Make the GenericWriteAheadSink rescalable.

2016-11-04 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-5020:
-

 Summary: Make the GenericWriteAheadSink rescalable.
 Key: FLINK-5020
 URL: https://issues.apache.org/jira/browse/FLINK-5020
 Project: Flink
  Issue Type: Improvement
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.2.0


This targets integrating the GenericWriteAheadSink with the new rescalable 
state abstractions so that the parallelism of the operator can change 
arbitrarily without jeopardizing the guarantees offered by it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4174) Enhance Window Evictor

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636619#comment-15636619
 ] 

ASF GitHub Bot commented on FLINK-4174:
---

Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
@aljoscha I have made the changes, could you please review it.


> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality

2016-11-04 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
@aljoscha I have made the changes, could you please review it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636606#comment-15636606
 ] 

ASF GitHub Bot commented on FLINK-4939:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2707
  
@zentol let me know if you have any more comments.


> GenericWriteAheadSink: Decouple the creating from the committing subtask for 
> a pending checkpoint
> -
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the 
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this 
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking 
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also 
> include that subtask index of the subtask that wrote 
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2707: [FLINK-4939] GenericWriteAheadSink: Decouple the c...

2016-11-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86563313
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 
(committer.isCheckpointCommitted(pastCheckpointId)) {
-   
checkpointsToRemove.add(pastCheckpointId);
+   /**
+* Called at {@link #open()} to clean-up the pending handle list.
+* It iterates 

[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636586#comment-15636586
 ] 

ASF GitHub Bot commented on FLINK-4939:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86563313
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 

[GitHub] flink issue #2680: [FLINK-4876] Allow web interface to be bound to a specifi...

2016-11-04 Thread attachmentgenie
Github user attachmentgenie commented on the issue:

https://github.com/apache/flink/pull/2680
  
@uce i made the changes are requested. With a few slight modifications 
however.

1. 
`.defaultValue(null);` throws an error so i opted to use 
`.noDefaultValue();` which behaves the same.

2. 
You can not pass `null` as the first argument of 
io.netty.bootstrap.ServerBootstra.bind() as it wants you to be explicit about 
the address ( it will also validate it as a valid local address). So i added a 
`if (configuredAddress == null) {` switch.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4876) Allow web interface to be bound to a specific ip/interface/inetHost

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636570#comment-15636570
 ] 

ASF GitHub Bot commented on FLINK-4876:
---

Github user attachmentgenie commented on the issue:

https://github.com/apache/flink/pull/2680
  
@uce i made the changes are requested. With a few slight modifications 
however.

1. 
`.defaultValue(null);` throws an error so i opted to use 
`.noDefaultValue();` which behaves the same.

2. 
You can not pass `null` as the first argument of 
io.netty.bootstrap.ServerBootstra.bind() as it wants you to be explicit about 
the address ( it will also validate it as a valid local address). So i added a 
`if (configuredAddress == null) {` switch.




> Allow web interface to be bound to a specific ip/interface/inetHost
> ---
>
> Key: FLINK-4876
> URL: https://issues.apache.org/jira/browse/FLINK-4876
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.1.2, 1.1.3
>Reporter: Bram Vogelaar
>Assignee: Bram Vogelaar
>Priority: Minor
>
> Currently the web interface automatically binds to all interfaces on 0.0.0.0. 
> IMHO there are some use cases to only bind to a specific ipadress, (e.g. 
> access through an authenticated proxy, not binding on the management or 
> backup interface)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2707: [FLINK-4939] GenericWriteAheadSink: Decouple the c...

2016-11-04 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86561350
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 
(committer.isCheckpointCommitted(pastCheckpointId)) {
-   
checkpointsToRemove.add(pastCheckpointId);
+   /**
+* Called at {@link #open()} to clean-up the pending handle list.
+* It iterates 

[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636561#comment-15636561
 ] 

ASF GitHub Bot commented on FLINK-4939:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86561357
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 

[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636562#comment-15636562
 ] 

ASF GitHub Bot commented on FLINK-4939:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86561350
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 

[GitHub] flink pull request #2707: [FLINK-4939] GenericWriteAheadSink: Decouple the c...

2016-11-04 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86561357
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 
(committer.isCheckpointCommitted(pastCheckpointId)) {
-   
checkpointsToRemove.add(pastCheckpointId);
+   /**
+* Called at {@link #open()} to clean-up the pending handle list.
+* It iterates 

[GitHub] flink pull request #2707: [FLINK-4939] GenericWriteAheadSink: Decouple the c...

2016-11-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86557631
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 
(committer.isCheckpointCommitted(pastCheckpointId)) {
-   
checkpointsToRemove.add(pastCheckpointId);
+   /**
+* Called at {@link #open()} to clean-up the pending handle list.
+* It iterates 

[GitHub] flink pull request #2755: [hotfix][docs] Stream joins don't support tuple po...

2016-11-04 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2755

[hotfix][docs] Stream joins don't support tuple position keys



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink docs_window_hoxfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2755


commit 9c8068bce6b0027a53552b4cb5bc9875aa752d07
Author: Robert Metzger 
Date:   2016-11-04T14:41:06Z

[hotfix][docs] Stream joins don't support tuple position keys




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636530#comment-15636530
 ] 

ASF GitHub Bot commented on FLINK-4939:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86557736
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 

[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636529#comment-15636529
 ] 

ASF GitHub Bot commented on FLINK-4939:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86557631
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 

[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636528#comment-15636528
 ] 

ASF GitHub Bot commented on FLINK-4939:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86557000
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
--- End diff --

This can't happen and is thus unnecessary.


> GenericWriteAheadSink: Decouple the creating from the committing subtask for 
> a pending checkpoint
> -
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the 
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this 
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking 
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also 
> include that subtask index of the subtask that wrote 
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2707: [FLINK-4939] GenericWriteAheadSink: Decouple the c...

2016-11-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86557736
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
+
//only add handle if a new OperatorState was created since the 
last snapshot
if (out != null) {
+   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
StreamStateHandle handle = out.closeAndGetHandle();
-   if (state.pendingHandles.containsKey(checkpointId)) {
+
+   PendingCheckpoint pendingCheckpoint = new 
PendingCheckpoint(
+   checkpointId, subtaskIdx, timestamp, handle);
+
+   if (pendingCheckpoints.contains(pendingCheckpoint)) {
//we already have a checkpoint stored for that 
ID that may have been partially written,
//so we discard this "alternate version" and 
use the stored checkpoint
handle.discardState();
} else {
-   state.pendingHandles.put(checkpointId, new 
Tuple2<>(timestamp, handle));
+   pendingCheckpoints.add(pendingCheckpoint);
}
out = null;
}
}
 
@Override
-   public void snapshotState(FSDataOutputStream out,
-   long checkpointId,
-   long timestamp) throws Exception {
+   public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
saveHandleInState(checkpointId, timestamp);
 
-   InstantiationUtil.serializeObject(out, state);
+   DataOutputViewStreamWrapper outStream = new 
DataOutputViewStreamWrapper(out);
+   outStream.writeInt(pendingCheckpoints.size());
+   for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+   pendingCheckpoint.serialize(outStream);
+   }
}
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
-   this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
+   final DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(in);
+   int numPendingHandles = inStream.readInt();
+   for (int i = 0; i < numPendingHandles; i++) {
+   
pendingCheckpoints.add(PendingCheckpoint.restore(inStream, 
getUserCodeClassloader()));
+   }
}
 
-   private void cleanState() throws Exception {
-   synchronized (this.state.pendingHandles) { //remove all handles 
that were already committed
-   Set pastCheckpointIds = 
this.state.pendingHandles.keySet();
-   Set checkpointsToRemove = new HashSet<>();
-   for (Long pastCheckpointId : pastCheckpointIds) {
-   if 
(committer.isCheckpointCommitted(pastCheckpointId)) {
-   
checkpointsToRemove.add(pastCheckpointId);
+   /**
+* Called at {@link #open()} to clean-up the pending handle list.
+* It iterates 

[GitHub] flink pull request #2707: [FLINK-4939] GenericWriteAheadSink: Decouple the c...

2016-11-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2707#discussion_r86557000
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 ---
@@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter 
committer, TypeSerializer s
public void open() throws Exception {
super.open();
committer.setOperatorId(id);
-   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
-   cleanState();
-   checkpointStreamFactory =
-   
getContainingTask().createCheckpointStreamFactory(this);
+
+   checkpointStreamFactory = getContainingTask()
+   .createCheckpointStreamFactory(this);
+
+   cleanRestoredHandles();
}
 
public void close() throws Exception {
committer.close();
}
 
/**
-* Saves a handle in the state.
+* Called when a checkpoint barrier arrives. It closes any open streams 
to the backend
+* and marks them as pending for committing to the external, 
third-party storage system.
 *
-* @param checkpointId
-* @throws IOException
+* @param checkpointId the id of the latest received checkpoint.
+* @throws IOException in case something went wrong when handling the 
stream to the backend.
 */
private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
+   Preconditions.checkNotNull(this.pendingCheckpoints, "The 
operator has not been properly initialized.");
--- End diff --

This can't happen and is thus unnecessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636525#comment-15636525
 ] 

Aljoscha Krettek commented on FLINK-5012:
-

Yep, but it's different from normal {{FlatMap}} and now you have to do an extra 
hop, i.e. {{ctx.collector().collect(myElement);}}

[~StephanEwen] what do you think? you were against removing {{Collector}} in 
the updated {{ProcessWindowFunction}} in 
[FLIP-2|https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata].

> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5019) Proper isRestored result for tasks that did not write state

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636501#comment-15636501
 ] 

ASF GitHub Bot commented on FLINK-5019:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2746#discussion_r86555735
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -63,164 +63,170 @@ public StateAssignmentOperation(
public boolean assignStates() throws Exception {
 
for (Map.Entry taskGroupStateEntry : 
latest.getTaskStates().entrySet()) {
+
TaskState taskState = taskGroupStateEntry.getValue();
ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
 
-   if (executionJobVertex != null) {
-   // check that the number of key groups have not 
changed
-   if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
-   throw new IllegalStateException("The 
maximum parallelism (" +
-   
taskState.getMaxParallelism() + ") with which the latest " +
-   "checkpoint of the 
execution job vertex " + executionJobVertex +
-   " has been taken and 
the current maximum parallelism (" +
-   
executionJobVertex.getMaxParallelism() + ") changed. This " +
-   "is currently not 
supported.");
+   if (executionJobVertex == null) {
--- End diff --

That's what I thought. I just wanted to double check because Github makes 
it look very scary.  


> Proper isRestored result for tasks that did not write state
> ---
>
> Key: FLINK-5019
> URL: https://issues.apache.org/jira/browse/FLINK-5019
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> When a subtask is restored from a checkpoint that does not contain any state 
> (e.g. because the subtask did not write state in the previous run), the 
> result of {{StateInitializationContext::isRestored}} will incorrectly return 
> false.
> We should ensure that empty state is somehow reflected in a checkpoint and 
> return true on restore, independent from the presence of state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2746: [FLINK-5019] Proper isRestored result for tasks th...

2016-11-04 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2746#discussion_r86555735
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -63,164 +63,170 @@ public StateAssignmentOperation(
public boolean assignStates() throws Exception {
 
for (Map.Entry taskGroupStateEntry : 
latest.getTaskStates().entrySet()) {
+
TaskState taskState = taskGroupStateEntry.getValue();
ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
 
-   if (executionJobVertex != null) {
-   // check that the number of key groups have not 
changed
-   if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
-   throw new IllegalStateException("The 
maximum parallelism (" +
-   
taskState.getMaxParallelism() + ") with which the latest " +
-   "checkpoint of the 
execution job vertex " + executionJobVertex +
-   " has been taken and 
the current maximum parallelism (" +
-   
executionJobVertex.getMaxParallelism() + ") changed. This " +
-   "is currently not 
supported.");
+   if (executionJobVertex == null) {
--- End diff --

That's what I thought. I just wanted to double check because Github makes 
it look very scary. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4221) Show metrics in WebFrontend

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636487#comment-15636487
 ] 

ASF GitHub Bot commented on FLINK-4221:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2724
  
I have no issue with removing the properties tab. @iampeter the new UI 
features look very nice!


> Show metrics in WebFrontend
> ---
>
> Key: FLINK-4221
> URL: https://issues.apache.org/jira/browse/FLINK-4221
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2724: [FLINK-4221] Show metrics in WebFrontend + general improv...

2016-11-04 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2724
  
I have no issue with removing the properties tab. @iampeter the new UI 
features look very nice!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4221) Show metrics in WebFrontend

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636481#comment-15636481
 ] 

ASF GitHub Bot commented on FLINK-4221:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2724#discussion_r86554520
  
--- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade ---
@@ -50,19 +50,15 @@ 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job")
   ul.nav.nav-tabs
 li(ui-sref-active='active')
-  a(ui-sref=".plan") Plan
+  a(ui-sref=".plan") Overview
 
-//- li(ui-sref-active='active' ng-if="job['end-time'] > -1")
 li(ui-sref-active='active')
   a(ui-sref=".timeline") Timeline
 
 li(ui-sref-active='active')
   a(ui-sref=".exceptions") Exceptions
 
 li(ui-sref-active='active')
-  a(ui-sref=".properties") Properties
-
-li(ui-sref-active='active')
   a(ui-sref=".config") Configuration
--- End diff --

I see it now. You have to click on an operator. I think I had filed this in 
a general UI ticket I had filed a while back but never gotten around to working 
on.


> Show metrics in WebFrontend
> ---
>
> Key: FLINK-4221
> URL: https://issues.apache.org/jira/browse/FLINK-4221
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.0.0
>Reporter: Chesnay Schepler
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2724: [FLINK-4221] Show metrics in WebFrontend + general...

2016-11-04 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2724#discussion_r86554520
  
--- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade ---
@@ -50,19 +50,15 @@ 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job")
   ul.nav.nav-tabs
 li(ui-sref-active='active')
-  a(ui-sref=".plan") Plan
+  a(ui-sref=".plan") Overview
 
-//- li(ui-sref-active='active' ng-if="job['end-time'] > -1")
 li(ui-sref-active='active')
   a(ui-sref=".timeline") Timeline
 
 li(ui-sref-active='active')
   a(ui-sref=".exceptions") Exceptions
 
 li(ui-sref-active='active')
-  a(ui-sref=".properties") Properties
-
-li(ui-sref-active='active')
   a(ui-sref=".config") Configuration
--- End diff --

I see it now. You have to click on an operator. I think I had filed this in 
a general UI ticket I had filed a while back but never gotten around to working 
on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5019) Proper isRestored result for tasks that did not write state

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636459#comment-15636459
 ] 

ASF GitHub Bot commented on FLINK-5019:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2746#discussion_r86552553
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 ---
@@ -339,9 +339,12 @@ protected void run() {
SubtaskState subtaskState = 
taskState.getState(tdd.getIndexInSubtaskGroup());
 
assertNotNull(subtaskState);
-   errMsg = "Initial operator state 
mismatch.";
-   assertEquals(errMsg, 
subtaskState.getLegacyOperatorState(),
-   
tdd.getTaskStateHandles().getLegacyOperatorState());
+
+   if (subtaskState.hasState()) {
--- End diff --

Didn't run a find-usage, but if that is true then I agree to your proposal 
:)


> Proper isRestored result for tasks that did not write state
> ---
>
> Key: FLINK-5019
> URL: https://issues.apache.org/jira/browse/FLINK-5019
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> When a subtask is restored from a checkpoint that does not contain any state 
> (e.g. because the subtask did not write state in the previous run), the 
> result of {{StateInitializationContext::isRestored}} will incorrectly return 
> false.
> We should ensure that empty state is somehow reflected in a checkpoint and 
> return true on restore, independent from the presence of state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4977) Enum serialization does not work in all cases

2016-11-04 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4977.
---

> Enum serialization does not work in all cases
> -
>
> Key: FLINK-4977
> URL: https://issues.apache.org/jira/browse/FLINK-4977
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.3
> Environment: Java SE 1.8.0_91
> Ubuntu 14.04.4 LTS (trusty)
>Reporter: Sean Winard
>Assignee: Stephan Ewen
>Priority: Minor
> Fix For: 1.2.0, 1.1.4
>
>
> Enums produce serialization failures whether they are by themselves or part 
> of a POJO in the stream. I've tried running in IntelliJ IDEA and also via 
> {{flink run}}. Here is a small program to reproduce:
> {code:java}
> package org.apache.flink.testenum;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class TestEnumStream {
> private enum MyEnum {
> NONE, SOMETHING, EVERYTHING
> }
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment environment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> environment.setParallelism(1);
> environment.fromElements(MyEnum.NONE, MyEnum.SOMETHING, 
> MyEnum.EVERYTHING)
> .addSink(x -> System.err.println(x));
> environment.execute("TestEnumStream");
> }
> }
> {code}
> {noformat}
> Exception in thread "main" java.lang.RuntimeException: Cannot access the 
> constants of the enum org.apache.flink.testenum.TestEnumStream$MyEnum
>   at 
> org.apache.flink.api.common.typeutils.base.EnumSerializer.createValues(EnumSerializer.java:132)
>   at 
> org.apache.flink.api.common.typeutils.base.EnumSerializer.(EnumSerializer.java:43)
>   at 
> org.apache.flink.api.java.typeutils.EnumTypeInfo.createSerializer(EnumTypeInfo.java:101)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:773)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:674)
> {noformat}
> I took a look at that line in EnumSerializer.java and swapped out the 
> reflection on the "values" method for the simpler 
> `enumClass.getEnumConstants()`, and that seems to work after I install my 
> custom flink-core jar. I believe this is because 
> [http://docs.oracle.com/javase/tutorial/reflect/special/enumMembers.html] 
> specifically states you cannot reflect on the "values" method since it is 
> implicitly generated at compile time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2746: [FLINK-5019] Proper isRestored result for tasks th...

2016-11-04 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2746#discussion_r86552553
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 ---
@@ -339,9 +339,12 @@ protected void run() {
SubtaskState subtaskState = 
taskState.getState(tdd.getIndexInSubtaskGroup());
 
assertNotNull(subtaskState);
-   errMsg = "Initial operator state 
mismatch.";
-   assertEquals(errMsg, 
subtaskState.getLegacyOperatorState(),
-   
tdd.getTaskStateHandles().getLegacyOperatorState());
+
+   if (subtaskState.hasState()) {
--- End diff --

Didn't run a find-usage, but if that is true then I agree to your proposal 
:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2746: [FLINK-5019] Proper isRestored result for tasks th...

2016-11-04 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2746#discussion_r86552393
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -63,164 +63,170 @@ public StateAssignmentOperation(
public boolean assignStates() throws Exception {
 
for (Map.Entry taskGroupStateEntry : 
latest.getTaskStates().entrySet()) {
+
TaskState taskState = taskGroupStateEntry.getValue();
ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
 
-   if (executionJobVertex != null) {
-   // check that the number of key groups have not 
changed
-   if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
-   throw new IllegalStateException("The 
maximum parallelism (" +
-   
taskState.getMaxParallelism() + ") with which the latest " +
-   "checkpoint of the 
execution job vertex " + executionJobVertex +
-   " has been taken and 
the current maximum parallelism (" +
-   
executionJobVertex.getMaxParallelism() + ") changed. This " +
-   "is currently not 
supported.");
+   if (executionJobVertex == null) {
--- End diff --

The code should do the same, I just found the if-nesting was a little deep 
and checking the preconditions first makes it more readable imo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5019) Proper isRestored result for tasks that did not write state

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636456#comment-15636456
 ] 

ASF GitHub Bot commented on FLINK-5019:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2746#discussion_r86552393
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -63,164 +63,170 @@ public StateAssignmentOperation(
public boolean assignStates() throws Exception {
 
for (Map.Entry taskGroupStateEntry : 
latest.getTaskStates().entrySet()) {
+
TaskState taskState = taskGroupStateEntry.getValue();
ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
 
-   if (executionJobVertex != null) {
-   // check that the number of key groups have not 
changed
-   if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
-   throw new IllegalStateException("The 
maximum parallelism (" +
-   
taskState.getMaxParallelism() + ") with which the latest " +
-   "checkpoint of the 
execution job vertex " + executionJobVertex +
-   " has been taken and 
the current maximum parallelism (" +
-   
executionJobVertex.getMaxParallelism() + ") changed. This " +
-   "is currently not 
supported.");
+   if (executionJobVertex == null) {
--- End diff --

The code should do the same, I just found the if-nesting was a little deep 
and checking the preconditions first makes it more readable imo.


> Proper isRestored result for tasks that did not write state
> ---
>
> Key: FLINK-5019
> URL: https://issues.apache.org/jira/browse/FLINK-5019
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> When a subtask is restored from a checkpoint that does not contain any state 
> (e.g. because the subtask did not write state in the previous run), the 
> result of {{StateInitializationContext::isRestored}} will incorrectly return 
> false.
> We should ensure that empty state is somehow reflected in a checkpoint and 
> return true on restore, independent from the presence of state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4900) Implement Docker image support

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636435#comment-15636435
 ] 

ASF GitHub Bot commented on FLINK-4900:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2703
  
Thanks for the PR! Jenkins CI is currently broken but Travis CI passed. 
Looks good to me but would be great if you could take a look @EronWright.


> Implement Docker image support
> --
>
> Key: FLINK-4900
> URL: https://issues.apache.org/jira/browse/FLINK-4900
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> Support the use of a docker image, with both the unified containerizer and 
> the Docker containerizer.
> Use a configuration setting to explicitly configure which image and 
> containerizer to use.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636444#comment-15636444
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r86548851
  
--- Diff: docs/setup/yarn_setup.md ---
@@ -134,6 +140,14 @@ Flink on YARN will only start all requested containers 
if enough resources are a
 some account also for the number of vcores. By default, the number of 
vcores is equal to the processing slots (`-s`) argument. The 
`yarn.containers.vcores` allows overwriting the
 number of vcores with a custom value.
 
+### Service Authorization using Secure Cookie
+
+If service authorization for the cluster components (Akka, Blob Service, 
Web UI) is enabled, you could pass the secure cookie value as command line 
argument (-k or --cookie) instead of hardcoding the value in Flink 
configuration file.
--- End diff --

I would link to the main security docs from here.

A crucial thing to point out here is that when users use this with YARN 
sessions, all jobs running in that session will use the same cookie. The cookie 
is a "per-cluster" or "per-processes" parameter.

Please add that for proper security between jobs, jobs should be submitted 
individually, not via a Flink Yarn Session.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   >