[jira] [Commented] (FLINK-5698) Add NestedFieldsProjectableTableSource interface

2017-03-24 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941590#comment-15941590
 ] 

ramkrishna.s.vasudevan commented on FLINK-5698:
---

Once this is integrated will work on HBasetable source to work with 
NestedFieldsProjectableTableSource. Thanks [~tonycox].

> Add NestedFieldsProjectableTableSource interface
> 
>
> Key: FLINK-5698
> URL: https://issues.apache.org/jira/browse/FLINK-5698
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>
> Add a NestedFieldsProjectableTableSource interface for some TableSource 
> implementation that support nesting projection push-down.
> The interface could look as follows
> {code}
> def trait NestedFieldsProjectableTableSource {
>   def projectNestedFields(fields: Array[String]): 
> NestedFieldsProjectableTableSource[T]
> }
> {code}
> This interface works together with ProjectableTableSource 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6186) Remove unused import

2017-03-24 Thread CanBin Zheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CanBin Zheng updated FLINK-6186:

Description: Remove unused import 
org.apache.flink.api.java.ExecutionEnvironment in 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scala

> Remove unused import
> 
>
> Key: FLINK-6186
> URL: https://issues.apache.org/jira/browse/FLINK-6186
> Project: Flink
>  Issue Type: Wish
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>Priority: Trivial
>
> Remove unused import org.apache.flink.api.java.ExecutionEnvironment in 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scala



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6186) Remove unused import

2017-03-24 Thread CanBin Zheng (JIRA)
CanBin Zheng created FLINK-6186:
---

 Summary: Remove unused import
 Key: FLINK-6186
 URL: https://issues.apache.org/jira/browse/FLINK-6186
 Project: Flink
  Issue Type: Wish
Reporter: CanBin Zheng
Assignee: CanBin Zheng
Priority: Trivial






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941547#comment-15941547
 ] 

ASF GitHub Bot commented on FLINK-6117:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3600
  
@Rucongzhang Please review it whether it meet your requirement.



> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-24 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-6117:
-
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-5839

> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable

2017-03-24 Thread CanBin Zheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941537#comment-15941537
 ] 

CanBin Zheng commented on FLINK-6148:
-

This is a reduplicate issue, I have reported this problem before and fixing it.

https://issues.apache.org/jira/browse/FLINK-6117




> The Zookeeper client occur SASL error when the sasl is disable
> --
>
> Key: FLINK-6148
> URL: https://issues.apache.org/jira/browse/FLINK-6148
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: zhangrucong1982
>
> I use the flink in yarn cluster of version 1.2.0.  The HA is configured in 
> flink-conf.yaml, but the sasl is disabled. The configurations are :
> high-availability: zookeeper
> high-availability.zookeeper.quorum: 
> 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181
> high-availability.zookeeper.storageDir: hdfs:/flink
> high-availability.zookeeper.client.acl: open
> high-availability.zookeeper.path.root:  flink0308
> zookeeper.sasl.disable: true
> The client log、JobManager log、TaskManager log are contain the following error 
> information:
> 2017-03-22 11:18:24,662 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-22 11:18:24,663 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941535#comment-15941535
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r108026230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -112,7 +113,12 @@ class DataStreamOverAggregate(
   "condition.")
 }
   case _: RowTimeType =>
-throw new TableException("OVER Window of the EventTime type is not 
currently supported.")
+if (overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
+  createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
--- End diff --

Hi @sunjincheng121,thanks for your reminding, and i am glad to supplement 
it.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
> Fix For: 1.3.0
>
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941530#comment-15941530
 ] 

ASF GitHub Bot commented on FLINK-6117:
---

Github user zhengcanbin commented on the issue:

https://github.com/apache/flink/pull/3600
  
@EronWright  @StephanEwen
In ZookeeperSaslClient.java, 'zookeeper.sasl.client' is true by default, so 
I agree to set 'zookeeper.sasl.disable' to false by default, for consistency.


> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Bug
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...

2017-03-24 Thread zhengcanbin
Github user zhengcanbin commented on the issue:

https://github.com/apache/flink/pull/3600
  
@EronWright  @StephanEwen
In ZookeeperSaslClient.java, 'zookeeper.sasl.client' is true by default, so 
I agree to set 'zookeeper.sasl.disable' to false by default, for consistency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable

2017-03-24 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-6148:


Assignee: (was: shijinkui)

> The Zookeeper client occur SASL error when the sasl is disable
> --
>
> Key: FLINK-6148
> URL: https://issues.apache.org/jira/browse/FLINK-6148
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: zhangrucong1982
>
> I use the flink in yarn cluster of version 1.2.0.  The HA is configured in 
> flink-conf.yaml, but the sasl is disabled. The configurations are :
> high-availability: zookeeper
> high-availability.zookeeper.quorum: 
> 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181
> high-availability.zookeeper.storageDir: hdfs:/flink
> high-availability.zookeeper.client.acl: open
> high-availability.zookeeper.path.root:  flink0308
> zookeeper.sasl.disable: true
> The client log、JobManager log、TaskManager log are contain the following error 
> information:
> 2017-03-22 11:18:24,662 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-22 11:18:24,663 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable

2017-03-24 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-6148:


Assignee: shijinkui

> The Zookeeper client occur SASL error when the sasl is disable
> --
>
> Key: FLINK-6148
> URL: https://issues.apache.org/jira/browse/FLINK-6148
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: zhangrucong1982
>Assignee: shijinkui
>
> I use the flink in yarn cluster of version 1.2.0.  The HA is configured in 
> flink-conf.yaml, but the sasl is disabled. The configurations are :
> high-availability: zookeeper
> high-availability.zookeeper.quorum: 
> 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181
> high-availability.zookeeper.storageDir: hdfs:/flink
> high-availability.zookeeper.client.acl: open
> high-availability.zookeeper.path.root:  flink0308
> zookeeper.sasl.disable: true
> The client log、JobManager log、TaskManager log are contain the following error 
> information:
> 2017-03-22 11:18:24,662 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-22 11:18:24,663 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5217) Deprecated interface Checkpointed make clear suggestion

2017-03-24 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941522#comment-15941522
 ] 

shijinkui commented on FLINK-5217:
--

ping [~srichter]

> Deprecated interface Checkpointed make clear suggestion
> ---
>
> Key: FLINK-5217
> URL: https://issues.apache.org/jira/browse/FLINK-5217
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: shijinkui
> Fix For: 1.2.1
>
>
> package org.apache.flink.streaming.api.checkpoint;
> @Deprecated
> @PublicEvolving
> public interface Checkpointed extends 
> CheckpointedRestoring
> this interface should have clear suggestion which version to give up this 
> interface, and which interface can instead of it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3599: [FLINK-6174][HA]introduce a new election service to make ...

2017-03-24 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3599
  
I don't think it's a good idea, as it can not solve the "split brain" issue 
too.

The key problem is that `LeaderLatch` in curator is too sensitive to 
connection state to Zookeeper(it will revoke leadership when connection to 
zookeeper is temporarily broke), and probably the best way is offerring a 
"duller" LeaderLatch, which can be also used in standalone cluster.

I did same work in our own private Spark release, let me see if it can be 
reused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2017-03-24 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941520#comment-15941520
 ] 

shijinkui commented on FLINK-5860:
--

ping [~yaroslav.mykhaylov] 

> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Yaroslav Mykhaylov
>  Labels: starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_TMP_DATA_DIR = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>FLINK_HDFS_PATH = "file:" + 
> 

[jira] [Commented] (FLINK-6174) Introduce a leader election service in yarn mode to make JobManager always available

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941521#comment-15941521
 ] 

ASF GitHub Bot commented on FLINK-6174:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3599
  
I don't think it's a good idea, as it can not solve the "split brain" issue 
too.

The key problem is that `LeaderLatch` in curator is too sensitive to 
connection state to Zookeeper(it will revoke leadership when connection to 
zookeeper is temporarily broke), and probably the best way is offerring a 
"duller" LeaderLatch, which can be also used in standalone cluster.

I did same work in our own private Spark release, let me see if it can be 
reused.


> Introduce a leader election service in yarn mode to make JobManager always 
> available
> 
>
> Key: FLINK-6174
> URL: https://issues.apache.org/jira/browse/FLINK-6174
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> Now in yarn mode, if we use zookeeper as high availability choice, it will 
> create a election service to get a leader depending on zookeeper election.
> When zookeeper leader crashes or the connection between JobManager and 
> zookeeper instance was broken, JobManager's leadership will be revoked and 
> send a Disconnect message to TaskManager, which will cancle all running tasks 
> and make them waiting connection rebuild between JM and ZK.
> In yarn mode, we have one and only JobManager(AM) in same time, and it should 
> be alwasy leader instead of elected through zookeeper. We can introduce a new 
> leader election service in yarn mode to achive that.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6060) reference nonexistent class in the scaladoc

2017-03-24 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941519#comment-15941519
 ] 

shijinkui commented on FLINK-6060:
--

[~aljoscha] Sorry for my unclear description.

For example, the class TaskOperationResult in the scaladoc. Actually 
TaskOperationResult is not exist, or it had been changed file name. So in such 
scaladoc, we should correct the referenced class.

  /**
   * Submits a task to the task manager. The result is to this message is a
   * [[TaskOperationResult]] message.
   *
   * @param tasks Descriptor which contains the information to start the task.
   */
  case class SubmitTask(tasks: TaskDeploymentDescriptor)
extends TaskMessage with RequiresLeaderSessionID


> reference nonexistent class in the scaladoc
> ---
>
> Key: FLINK-6060
> URL: https://issues.apache.org/jira/browse/FLINK-6060
> Project: Flink
>  Issue Type: Wish
>  Components: Scala API
>Reporter: shijinkui
>
> TaskMessages.scala
> ConnectedStreams.scala
> DataStream.scala
> Who can fix it?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6060) reference nonexistent class in the scaladoc

2017-03-24 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-6060:
-
Summary: reference nonexistent class in the scaladoc  (was: not exist class 
referance in the scala function annotation)

> reference nonexistent class in the scaladoc
> ---
>
> Key: FLINK-6060
> URL: https://issues.apache.org/jira/browse/FLINK-6060
> Project: Flink
>  Issue Type: Wish
>  Components: Scala API
>Reporter: shijinkui
>
> TaskMessages.scala
> ConnectedStreams.scala
> DataStream.scala
> Who can fix it?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes

2017-03-24 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941516#comment-15941516
 ] 

shijinkui commented on FLINK-5754:
--

[~greghogan] At first, I image that the flink tag is the same with other open 
source project, so the checkout a branch from tag. It's have to reset it, we 
had gone forward too much.

If we have no special reason, can we don't delete any thing at tag release on 
the next milestone, that following the common tag/release rule?
If so, it'll be very convenient to develop private flink version. And then it 
will have no any difficult to merge to flink community code base.

Thanks



> released tag missing .gitigonore  .travis.yml .gitattributes
> 
>
> Key: FLINK-5754
> URL: https://issues.apache.org/jira/browse/FLINK-5754
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: shijinkui
>
> released tag missing .gitigonore  .travis.yml .gitattributes.
> When make a release version, should only replace the version.
> for example: https://github.com/apache/spark/tree/v2.1.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941507#comment-15941507
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r108025485
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -112,7 +113,12 @@ class DataStreamOverAggregate(
   "condition.")
 }
   case _: RowTimeType =>
-throw new TableException("OVER Window of the EventTime type is not 
currently supported.")
+if (overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
+  createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
--- End diff --

No problem. So,Can I change this JIRA.'s title, And open a new JIRA. to 
addresses the `RANGE` case ? @fhueske 
Thanks,
SunJincheng


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
> Fix For: 1.3.0
>
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r108025485
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -112,7 +113,12 @@ class DataStreamOverAggregate(
   "condition.")
 }
   case _: RowTimeType =>
-throw new TableException("OVER Window of the EventTime type is not 
currently supported.")
+if (overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
+  createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
--- End diff --

No problem. So,Can I change this JIRA.'s title, And open a new JIRA. to 
addresses the `RANGE` case ? @fhueske 
Thanks,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941412#comment-15941412
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3607
  
HI, @rtudoran Thanks for this PR. It's looks very promising. Please rebase 
code on master first, Then I glad to take a look this changes.
Best,
SunJincheng


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3607: [FLINK-5654] - Add processing time OVER RANGE BETWEEN x P...

2017-03-24 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3607
  
HI, @rtudoran Thanks for this PR. It's looks very promising. Please rebase 
code on master first, Then I glad to take a look this changes.
Best,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6185) Input readers and output writers/formats need to support gzip

2017-03-24 Thread Luke Hutchison (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Hutchison updated FLINK-6185:
--
Description: 
File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such 
as {{FileOutputFormat}} and its subclasses, and methods such as 
{{DataSet#writeAsText()}}) need the ability to transparently decompress and 
compress files. Primarily gzip would be useful, but it would be nice if this 
were pluggable to support bzip2, xz, etc.

There could be options for autodetect (based on file extension and/or file 
content), which could be the default, as well as no compression or a selected 
compression method.

  was:
File sources (such as {{env#readCsvFile()}}) and sinks (such as 
{{FileOutputFormat}} and its subclasses, and methods such as 
{{DataSet#writeAsText()}}) need the ability to transparently decompress and 
compress files. Primarily gzip would be useful, but it would be nice if this 
were pluggable to support bzip2, xz, etc.

There could be options for autodetect (based on file extension and/or file 
content), which could be the default, as well as no compression or a selected 
compression method.


> Input readers and output writers/formats need to support gzip
> -
>
> Key: FLINK-6185
> URL: https://issues.apache.org/jira/browse/FLINK-6185
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>Priority: Minor
>
> File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such 
> as {{FileOutputFormat}} and its subclasses, and methods such as 
> {{DataSet#writeAsText()}}) need the ability to transparently decompress and 
> compress files. Primarily gzip would be useful, but it would be nice if this 
> were pluggable to support bzip2, xz, etc.
> There could be options for autodetect (based on file extension and/or file 
> content), which could be the default, as well as no compression or a selected 
> compression method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6185) Input readers and output writers/formats need to support gzip

2017-03-24 Thread Luke Hutchison (JIRA)
Luke Hutchison created FLINK-6185:
-

 Summary: Input readers and output writers/formats need to support 
gzip
 Key: FLINK-6185
 URL: https://issues.apache.org/jira/browse/FLINK-6185
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Luke Hutchison
Priority: Minor


File sources (such as {{env#readCsvFile()}}) and sinks (such as 
FileOutputFormat and its subclasses, and methods such as 
{{DataSet#writeAsText()}}) need the ability to transparently decompress and 
compress files. Primarily gzip would be useful, but it would be nice if this 
were pluggable to support bzip2, xz, etc.

There could be options for autodetect (based on file extension and/or file 
content), which could be the default, as well as no compression or a selected 
compression method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6185) Input readers and output writers/formats need to support gzip

2017-03-24 Thread Luke Hutchison (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Hutchison updated FLINK-6185:
--
Description: 
File sources (such as {{env#readCsvFile()}}) and sinks (such as 
{{FileOutputFormat}} and its subclasses, and methods such as 
{{DataSet#writeAsText()}}) need the ability to transparently decompress and 
compress files. Primarily gzip would be useful, but it would be nice if this 
were pluggable to support bzip2, xz, etc.

There could be options for autodetect (based on file extension and/or file 
content), which could be the default, as well as no compression or a selected 
compression method.

  was:
File sources (such as {{env#readCsvFile()}}) and sinks (such as 
FileOutputFormat and its subclasses, and methods such as 
{{DataSet#writeAsText()}}) need the ability to transparently decompress and 
compress files. Primarily gzip would be useful, but it would be nice if this 
were pluggable to support bzip2, xz, etc.

There could be options for autodetect (based on file extension and/or file 
content), which could be the default, as well as no compression or a selected 
compression method.


> Input readers and output writers/formats need to support gzip
> -
>
> Key: FLINK-6185
> URL: https://issues.apache.org/jira/browse/FLINK-6185
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>Priority: Minor
>
> File sources (such as {{env#readCsvFile()}}) and sinks (such as 
> {{FileOutputFormat}} and its subclasses, and methods such as 
> {{DataSet#writeAsText()}}) need the ability to transparently decompress and 
> compress files. Primarily gzip would be useful, but it would be nice if this 
> were pluggable to support bzip2, xz, etc.
> There could be options for autodetect (based on file extension and/or file 
> content), which could be the default, as well as no compression or a selected 
> compression method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3269: [FLINK-5698] Add NestedFieldsProjectableTableSource trait

2017-03-24 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3269
  
Hi @tonycox, thanks for the update!

I'll do some minor improvements and will merge the PR.

Thank you,
Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5698) Add NestedFieldsProjectableTableSource interface

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941330#comment-15941330
 ] 

ASF GitHub Bot commented on FLINK-5698:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3269
  
Hi @tonycox, thanks for the update!

I'll do some minor improvements and will merge the PR.

Thank you,
Fabian


> Add NestedFieldsProjectableTableSource interface
> 
>
> Key: FLINK-5698
> URL: https://issues.apache.org/jira/browse/FLINK-5698
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>
> Add a NestedFieldsProjectableTableSource interface for some TableSource 
> implementation that support nesting projection push-down.
> The interface could look as follows
> {code}
> def trait NestedFieldsProjectableTableSource {
>   def projectNestedFields(fields: Array[String]): 
> NestedFieldsProjectableTableSource[T]
> }
> {code}
> This interface works together with ProjectableTableSource 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3605: [FLINK-6181][Start scripts] Fix regex in start scr...

2017-03-24 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3605#discussion_r108001505
  
--- Diff: tools/travis_mvn_watchdog.sh ---
@@ -164,7 +164,7 @@ watchdog () {
 
 # Check the final fat jar for illegal artifacts
 check_shaded_artifacts() {
-   jar tf build-target/lib/flink-dist-*.jar > allClasses
--- End diff --

The problem is that our dist jar is called 
`flink-dist_2.10-1.3-SNAPSHOT.jar`. so the old variant didn't match the jar.
This leads to the following error in all travis build:
```
java.io.FileNotFoundException: build-target/lib/flink-dist-*.jar (No such 
file or directory)
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.(ZipFile.java:220)
at java.util.zip.ZipFile.(ZipFile.java:150)
at java.util.zip.ZipFile.(ZipFile.java:121)
at sun.tools.jar.Main.list(Main.java:1060)
at sun.tools.jar.Main.run(Main.java:291)
at sun.tools.jar.Main.main(Main.java:1233)
``` 
I'm doing `jar tf` here to check if guava and other libraries are not part 
of the fat jar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6181) Zookeeper scripts use invalid regex

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941153#comment-15941153
 ] 

ASF GitHub Bot commented on FLINK-6181:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3605#discussion_r108001505
  
--- Diff: tools/travis_mvn_watchdog.sh ---
@@ -164,7 +164,7 @@ watchdog () {
 
 # Check the final fat jar for illegal artifacts
 check_shaded_artifacts() {
-   jar tf build-target/lib/flink-dist-*.jar > allClasses
--- End diff --

The problem is that our dist jar is called 
`flink-dist_2.10-1.3-SNAPSHOT.jar`. so the old variant didn't match the jar.
This leads to the following error in all travis build:
```
java.io.FileNotFoundException: build-target/lib/flink-dist-*.jar (No such 
file or directory)
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.(ZipFile.java:220)
at java.util.zip.ZipFile.(ZipFile.java:150)
at java.util.zip.ZipFile.(ZipFile.java:121)
at sun.tools.jar.Main.list(Main.java:1060)
at sun.tools.jar.Main.run(Main.java:291)
at sun.tools.jar.Main.main(Main.java:1233)
``` 
I'm doing `jar tf` here to check if guava and other libraries are not part 
of the fat jar.


> Zookeeper scripts use invalid regex
> ---
>
> Key: FLINK-6181
> URL: https://issues.apache.org/jira/browse/FLINK-6181
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Startup Shell Scripts
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> This issue has been reported by a user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/unable-to-add-more-servers-in-zookeeper-quorum-peers-in-flink-1-2-td12321.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6085) flink as micro service

2017-03-24 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939209#comment-15939209
 ] 

Chen Qin edited comment on FLINK-6085 at 3/24/17 8:32 PM:
--

I would like to see if we can agree on high level first. Service is primarily 
rpc interaction with horizontal scalability and latency requirements.

Current way of bridge service with streaming pipeline via distributed Queue 
provides benefit of failure resilience and topic reuse at cost of extra 
hardware/software and latency, also no callback support.

[~till.rohrmann]

updates

Briefly chatted offline with Maxim, it seems a bit hard to work around 
distributed queue consider pipeline can restart and offset rewind anytime, loss 
of insertion events is not acceptable(query might be fine but seems flink 
already address this issue with query able states)

To echo Till's comments, yes, custom code could track those requests. Future 
question is if we can have a specific sink implementation which can reroute 
results to a specific rpc hosts (e.g http response or callback). 




was (Author: foxss):
I would like to see if we can agree on high level first. Service is primarily 
rpc interaction with horizontal scalability and latency requirements.

Current way of bridge service with streaming pipeline via distributed Queue 
provides benefit of failure resilience and topic reuse at cost of extra 
hardware/software and latency, also no callback support.

[~till.rohrmann]


> flink as micro service
> --
>
> Key: FLINK-6085
> URL: https://issues.apache.org/jira/browse/FLINK-6085
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, JobManager
>Reporter: Chen Qin
>Priority: Minor
> Attachments: Untitled document.jpg
>
>
> Track discussion around run flink as a micro service, includes but not 
> limited to 
> - RPC (web service endpoint) source
>   as web service endpoint accept RPC call, ingest to the streaming job(only 
> one)
> - callback mechanism 
> - task assignment should honor deployment group (web tier hosts should be 
> isolated from rest of task assignment)
> https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3564: [FLINK-6089] [table] Implement decoration phase for rewri...

2017-03-24 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3564
  
I accidentally merged this to `master` not `table-retraction`.
Since the changes do not break anything, this is not a problem. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941043#comment-15941043
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r107987832
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -112,7 +113,12 @@ class DataStreamOverAggregate(
   "condition.")
 }
   case _: RowTimeType =>
-throw new TableException("OVER Window of the EventTime type is not 
currently supported.")
+if (overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
+  createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
--- End diff --

I see, so this PR addresses the `ROW` case.
I'll push out a hotfix. 

Thanks for the notification @sunjincheng121!


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
> Fix For: 1.3.0
>
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r107987832
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -112,7 +113,12 @@ class DataStreamOverAggregate(
   "condition.")
 }
   case _: RowTimeType =>
-throw new TableException("OVER Window of the EventTime type is not 
currently supported.")
+if (overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
+  createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
--- End diff --

I see, so this PR addresses the `ROW` case.
I'll push out a hotfix. 

Thanks for the notification @sunjincheng121!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5990) Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-24 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-5990.

   Resolution: Implemented
Fix Version/s: 1.3.0

Implemented with 7a9d39fe9f659d43bf4719a2981f6c4771ffbe48

> Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5990
> URL: https://issues.apache.org/jira/browse/FLINK-5990
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> The goal of this issue is to add support for OVER ROWS aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND 
> CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND 
> CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is required
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates event time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5803)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6145) Add [non-partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-24 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941007#comment-15941007
 ] 

sunjincheng commented on FLINK-6145:


Fixed by 
[7a9d39f|https://github.com/apache/flink/commit/7a9d39fe9f659d43bf4719a2981f6c4771ffbe48]

> Add [non-partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to 
> SQL
> -
>
> Key: FLINK-6145
> URL: https://issues.apache.org/jira/browse/FLINK-6145
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER ROWS aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) 
> AS sumB,
>   MIN(b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) 
> AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates event time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5804)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5829) Bump Calcite version to 1.12 once available

2017-03-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941021#comment-15941021
 ] 

Fabian Hueske commented on FLINK-5829:
--

Calcite 1.12 has been release today: 
http://calcite.apache.org/news/2017/03/24/release-1.12.0/

We can upgrade and address the related issues.

> Bump Calcite version to 1.12 once available
> ---
>
> Key: FLINK-5829
> URL: https://issues.apache.org/jira/browse/FLINK-5829
> Project: Flink
>  Issue Type: Task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Haohui Mai
>
> Once Calcite 1.12 is release we should update to remove some copied classes. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5570) Support register external catalog to table environment

2017-03-24 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-5570.

   Resolution: Implemented
Fix Version/s: 1.3.0

Implemented with 135a57c4bb37eaa9cb85faaff1cc694f9448fabd

> Support register external catalog to table environment
> --
>
> Key: FLINK-5570
> URL: https://issues.apache.org/jira/browse/FLINK-5570
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
> Fix For: 1.3.0
>
>
> This issue aims to support register one or more {{ExternalCatalog}} (which is 
> referred in https://issues.apache.org/jira/browse/FLINK-5568) to 
> {{TableEnvironment}}. After registration, SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables one 
> by one to {{TableEnvironment}} beforehand.
> We plan to add two APIs in {{TableEnvironment}}:
> 1. register externalCatalog
> {code}
> def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): 
> Unit
> {code}
> 2. scan a table from registered catalog and returns the resulting {{Table}},  
> the API is very useful in TableAPI queries.
> {code}
> def scan(catalogName: String, tableIdentifier: TableIdentifier): Table
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6089) Implement decoration phase for rewriting predicated logical plan after volcano optimization phase

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941013#comment-15941013
 ] 

ASF GitHub Bot commented on FLINK-6089:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3564
  
I accidentally merged this to `master` not `table-retraction`.
Since the changes do not break anything, this is not a problem. 


> Implement decoration phase for rewriting predicated logical plan after 
> volcano optimization phase
> -
>
> Key: FLINK-6089
> URL: https://issues.apache.org/jira/browse/FLINK-6089
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
> Fix For: 1.3.0
>
>
> At present, there is no chance to modify the DataStreamRel tree after the 
> volcano optimization. We consider to add a decoration phase after volcano 
> optimization phase. Decoration phase is dedicated for rewriting predicated 
> logical plan and is independent of cost module. After decoration phase is 
> added, we get the chance to apply retraction rules at this phase.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r107982106
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -112,7 +113,12 @@ class DataStreamOverAggregate(
   "condition.")
 }
   case _: RowTimeType =>
-throw new TableException("OVER Window of the EventTime type is not 
currently supported.")
+if (overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
+  createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
--- End diff --

EVENT-TIME OVER Need to treatment "rows" and "range" clause separately, 
because 
-  ROWS specifies the window in physical units (rows).
-  RANGE specifies the window as a logical offset.

They have different semantics, for example:
DATA:
```
(long, int, String)
(1L, 1, "H")
(2L, 2, "H")
(2L, 3,"H")
```
ROWS sum(b) result: `1,3,6`
RANGE sum(b) result: `1,6,6`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6089) Implement decoration phase for rewriting predicated logical plan after volcano optimization phase

2017-03-24 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6089.

   Resolution: Implemented
Fix Version/s: 1.3.0

Implemented with 6949c8c79c41344023df08dde2936f06daa00e0d

> Implement decoration phase for rewriting predicated logical plan after 
> volcano optimization phase
> -
>
> Key: FLINK-6089
> URL: https://issues.apache.org/jira/browse/FLINK-6089
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
> Fix For: 1.3.0
>
>
> At present, there is no chance to modify the DataStreamRel tree after the 
> volcano optimization. We consider to add a decoration phase after volcano 
> optimization phase. Decoration phase is dedicated for rewriting predicated 
> logical plan and is independent of cost module. After decoration phase is 
> added, we get the chance to apply retraction rules at this phase.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6145) Add [non-partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-24 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-6145.
--
Resolution: Fixed

> Add [non-partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to 
> SQL
> -
>
> Key: FLINK-6145
> URL: https://issues.apache.org/jira/browse/FLINK-6145
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER ROWS aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) 
> AS sumB,
>   MIN(b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) 
> AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates event time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5804)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-24 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-5658.

   Resolution: Implemented
Fix Version/s: 1.3.0

Implemented with fe2c61a28e6a5300b2cf4c1e50ee884b51ef42c9

> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
> Fix For: 1.3.0
>
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941000#comment-15941000
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r107982106
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -112,7 +113,12 @@ class DataStreamOverAggregate(
   "condition.")
 }
   case _: RowTimeType =>
-throw new TableException("OVER Window of the EventTime type is not 
currently supported.")
+if (overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
+  createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
--- End diff --

EVENT-TIME OVER Need to treatment "rows" and "range" clause separately, 
because 
-  ROWS specifies the window in physical units (rows).
-  RANGE specifies the window as a logical offset.

They have different semantics, for example:
DATA:
```
(long, int, String)
(1L, 1, "H")
(2L, 2, "H")
(2L, 3,"H")
```
ROWS sum(b) result: `1,3,6`
RANGE sum(b) result: `1,6,6`



> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5990) Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940991#comment-15940991
 ] 

ASF GitHub Bot commented on FLINK-5990:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3585


> Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5990
> URL: https://issues.apache.org/jira/browse/FLINK-5990
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER ROWS aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND 
> CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND 
> CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is required
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates event time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5803)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940992#comment-15940992
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3386


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3409


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6089) Implement decoration phase for rewriting predicated logical plan after volcano optimization phase

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940989#comment-15940989
 ] 

ASF GitHub Bot commented on FLINK-6089:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3564


> Implement decoration phase for rewriting predicated logical plan after 
> volcano optimization phase
> -
>
> Key: FLINK-6089
> URL: https://issues.apache.org/jira/browse/FLINK-6089
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> At present, there is no chance to modify the DataStreamRel tree after the 
> volcano optimization. We consider to add a decoration phase after volcano 
> optimization phase. Decoration phase is dedicated for rewriting predicated 
> logical plan and is independent of cost module. After decoration phase is 
> added, we get the chance to apply retraction rules at this phase.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940990#comment-15940990
 ] 

ASF GitHub Bot commented on FLINK-5568:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3409


> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables (only 
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> Here is the design mode of ExternalCatalogTable.
> |  identifier  | TableIdentifier | dbName and tableName 
> of table |
> |  tableType | String | type of external catalog table, 
> e.g csv, hbase, kafka
> |  schema| DataSchema|  schema of table data, 
> including column names and column types
> | partitionColumnNames | List | names of partition column
> | properties  | Map |properties of 
> external catalog table
> | stats   | TableStats | statistics of external 
> catalog table 
> | comment | String | 
> | create time | long
> There is still a detail problem need to be take into consideration, that is , 
> how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The 
> question is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} 
> because we could  easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate 
> an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
> fieldDelim, rowDelim and so on to create a new instance , 
> {{KafkaTableSource}} needs configuration and tableName to create a new 
> instance. So it's not a good idea to let Flink framework be responsible for 
> translate  {{ExternalCatalogTable}} to different kind of 
> {{TableSourceTable}}. 
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
> {{TableSource}} with the annotation means it is compatible with external 
> catalog, that is, it could be converted to or from ExternalCatalogTable. This 
> annotation specifies the tabletype and converter of the tableSource. For 
> example, for {{CsvTableSource}}, it specifies the tableType is csv and 
> converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter = 
> classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
> the tableType and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
> converter based on tableType. and let converter do convert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3585: [FLINK-5990][table]Add event time OVER ROWS BETWEE...

2017-03-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3585


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3564: [FLINK-6089] [table] Implement decoration phase fo...

2017-03-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3564


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3386


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3611: [backport] [FLINK-6183]/[FLINK-6184] Prevent some ...

2017-03-24 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3611

[backport] [FLINK-6183]/[FLINK-6184] Prevent some NPE and unclosed metric 
groups

Backport of #3610 for 1.2 .

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 6183_6184_metric_task_backport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3611


commit 790b3ce444e10191731850bad71c35fe050d9af3
Author: zentol 
Date:   2017-03-24T18:11:58Z

[FLINK-6184] Prevent NPE in buffer metrics

commit 13e40466ffe63783c59cc979900ba7af2d693576
Author: zentol 
Date:   2017-03-24T18:39:31Z

[FLINK-6183] [metrics] Prevent some cases of TaskMG not being closed




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940943#comment-15940943
 ] 

ASF GitHub Bot commented on FLINK-6183:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3611

[backport] [FLINK-6183]/[FLINK-6184] Prevent some NPE and unclosed metric 
groups

Backport of #3610 for 1.2 .

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 6183_6184_metric_task_backport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3611


commit 790b3ce444e10191731850bad71c35fe050d9af3
Author: zentol 
Date:   2017-03-24T18:11:58Z

[FLINK-6184] Prevent NPE in buffer metrics

commit 13e40466ffe63783c59cc979900ba7af2d693576
Author: zentol 
Date:   2017-03-24T18:39:31Z

[FLINK-6183] [metrics] Prevent some cases of TaskMG not being closed




> TaskMetricGroup may not be cleanup when Task.run() is never called or exits 
> early
> -
>
> Key: FLINK-6183
> URL: https://issues.apache.org/jira/browse/FLINK-6183
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>
> The TaskMetricGroup is created when a Task is created. It is cleaned up at 
> the end of Task.run() in the finally block. If however run() is never called 
> due some failure between the creation and the call to run the metric group is 
> never closed. This also means that the JobMetricGroup is never closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940910#comment-15940910
 ] 

ASF GitHub Bot commented on FLINK-6183:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3610

[FLINK-6183]/[FLINK-6184] Prevent some NPE and unclosed metric groups

This PR fixes 2 issues:

1) It prevents some NPEs in the buffer metrics by instantiating them after 
the task has been registered in the NetworkEnvironment.

2) It prevents some cases where the TaskMetricGroup would never be closed. 
These cases include an early exit in `Task#run()` and when 2) tasks with an 
identical ExecutionAttemptID are run on the same TM.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 6183_6184_metric_task

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3610.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3610






> TaskMetricGroup may not be cleanup when Task.run() is never called or exits 
> early
> -
>
> Key: FLINK-6183
> URL: https://issues.apache.org/jira/browse/FLINK-6183
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>
> The TaskMetricGroup is created when a Task is created. It is cleaned up at 
> the end of Task.run() in the finally block. If however run() is never called 
> due some failure between the creation and the call to run the metric group is 
> never closed. This also means that the JobMetricGroup is never closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3610: [FLINK-6183]/[FLINK-6184] Prevent some NPE and unc...

2017-03-24 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3610

[FLINK-6183]/[FLINK-6184] Prevent some NPE and unclosed metric groups

This PR fixes 2 issues:

1) It prevents some NPEs in the buffer metrics by instantiating them after 
the task has been registered in the NetworkEnvironment.

2) It prevents some cases where the TaskMetricGroup would never be closed. 
These cases include an early exit in `Task#run()` and when 2) tasks with an 
identical ExecutionAttemptID are run on the same TM.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 6183_6184_metric_task

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3610.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3610






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early

2017-03-24 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-6183:
---

Assignee: Chesnay Schepler

> TaskMetricGroup may not be cleanup when Task.run() is never called or exits 
> early
> -
>
> Key: FLINK-6183
> URL: https://issues.apache.org/jira/browse/FLINK-6183
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>
> The TaskMetricGroup is created when a Task is created. It is cleaned up at 
> the end of Task.run() in the finally block. If however run() is never called 
> due some failure between the creation and the call to run the metric group is 
> never closed. This also means that the JobMetricGroup is never closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...

2017-03-24 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107968436
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() {
+   return 
Collections.emptyListIterator();
+   }
+   };
+   }
+
+   return new 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-24 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940880#comment-15940880
 ] 

radu commented on FLINK-6073:
-

the join window can be one window element as we emit for every incoming event 
from the main stream (left). For the incoming events from the right stream 
(inner stream) we can cache the last data into a ValueState

> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query would need to be done based on 
> latest elements. Therefore with such a behavior the 2 types of queries (Q1 
> and Q3) would provide the same, intuitive result.
> **Functionality example**
> Based on the logical translation plan, we exemplify next the behavior of the 
> inner query applied on 2 

[jira] [Commented] (FLINK-6073) Support for SQL inner queries for proctime

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940872#comment-15940872
 ] 

ASF GitHub Bot commented on FLINK-6073:
---

Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3609
  
@fhueske @twalthr @sunjincheng121 @shijinkui @stefanobortoli @hongyuhong

I have made a first implementation draft for supporting inner queries 
mainly when operating on processing time. I would highly appreciate some 
feedback from you to further enhance the approach.

The idea of the implementation is described in 
https://issues.apache.org/jira/browse/FLINK-6073?filter=-2



> Support for SQL inner queries for proctime
> --
>
> Key: FLINK-6073
> URL: https://issues.apache.org/jira/browse/FLINK-6073
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: radu
>Priority: Critical
>  Labels: features
> Attachments: innerquery.png
>
>
> Time target: Proc Time
> **SQL targeted query examples:**
>  
> Q1) `Select  item, (select item2 from stream2 ) as itemExtern from stream1;`
> Comments: This is the main functionality targeted by this JIRA to enable to 
> combine in the main query results from an inner query.
> Q2) `Select  s1.item, (Select a2 from table as t2 where table.id = s1.id  
> limit 1) from s1;`
> Comments:
> Another equivalent way to write the first example of inner query is with 
> limit 1. This ensures the equivalency with the SingleElementAggregation used 
> when translated the main target syntax for inner query. We must ensure that 
> the 2 syntaxes are supported and implemented with the same functionality. 
> There is the option also to select elements in the inner query from a table 
> not just from a different stream. This should be a sub-JIRA issue implement 
> this support.
> **Description:**
> Parsing the SQL inner query via calcite is translated to a join function 
> (left join with always true condition) between the output of the query on the 
> main stream and the output of a single output aggregation operation on the 
> inner query. The translation logic is shown below
> ```
> LogicalJoin [condition=true;type=LEFT]
>   LogicalSingleValue[type=aggregation]
>   …logic of inner query (LogicalProject, LogicalScan…)
>   …logical of main,external query (LogicalProject, LogicalScan…))
> ```
> `LogicalJoin[condition=true;type=LEFT] `– it can be considered as a special 
> case operation rather than a proper join to be implemented between 
> stream-to-stream. The implementation behavior should attach to the main 
> stream output a value from a different query. 
> `LogicalSingleValue[type=aggregation]` – it can be interpreted as the holder 
> of the single value that results from the inner query. As this operator is 
> the guarantee that the inner query will bring to the join no more than one 
> value, there are several options on how to consider it’s functionality in the 
> streaming context:
> 1.Throw an error if the inner query returns more than one result. This 
> would be a typical behavior in the case of standard SQL over DB. However, it 
> is very unlikely that a stream would only emit a single value. Therefore, 
> such a behavior would be very limited for streams in the inner query. 
> However, such a behavior might be more useful and common if the inner query 
> is over a table. 
> 1.We can interpret the usage of this parameter as the guarantee that at 
> one moment only one value is selected. Therefore the behavior would rather be 
> as a filter to select one value. This brings the option that the output of 
> this operator evolves in time with the second stream that drives the inner 
> query. The decision on when to evolve the stream should depend on what marks 
> the evolution of the stream (processing time, watermarks/event time, 
> ingestion time, window time partitions…).
>  In this JIRA issue the evolution would be marked by the processing time. For 
> this implementation the operator would work based on option 2. Hence at every 
> moment the state of the operator that holds one value can evolve with the 
> last elements. In this way the logic of the inner query is to select always 
> the last element (fields, or other query related transformations based on the 
> last value). This behavior is needed in many scenarios: (e.g., the typical 
> problem of computing the total income, when incomes are in multiple 
> currencies and the total needs to be computed in one currency by using always 
> the last exchange rate).
> This behavior is motivated also by the functionality of the 3rd SQL query 
> example – Q3  (using inner query as the input source for FROM ). In such 
> scenarios, the selection in the main query 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940873#comment-15940873
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107967886
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940878#comment-15940878
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107969496
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -17,100 +17,164 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TODO write javadoc
+ * 
+ * - open a list state per snapshot process
+ * - book-keep snapshot logs
+ * - Clean up state when a savepoint is complete - ONLY in-transit records 
who do NOT belong in other snapshots
+ *
+ * @param 
+ */
 @Internal
-public class StreamIterationHead extends OneInputStreamTask 
{
+public class StreamIterationHead extends OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
private volatile boolean running = true;
 
-   // 

-   
+   private volatile RecordWriterOutput[] outputs;
+
+   private UpstreamLogger upstreamLogger;
+
+   private Object lock;
+
+   @Override
+   public void init() throws Exception {
+   this.lock = getCheckpointLock();
+   getConfiguration().setStreamOperator(new 
UpstreamLogger(getConfiguration()));
+   operatorChain = new OperatorChain<>(this);
+   this.upstreamLogger = (UpstreamLogger) 
operatorChain.getHeadOperator();
+   }
+
@Override
protected void run() throws Exception {
-   
+
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task 
configuration");
}
-   
-   final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId ,
-   
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
-   
+   final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId,
+   getEnvironment().getTaskInfo().getIndexOfThisSubtask());
final long iterationWaitTime = 
getConfiguration().getIterationWaitTime();
final boolean shouldWait = iterationWaitTime > 0;
 
-   final BlockingQueue dataChannel = new 
ArrayBlockingQueue(1);
+   final BlockingQueue> dataChannel
+   = new ArrayBlockingQueue<>(1);
 
// offer the queue for the tail
BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
LOG.info("Iteration head {} added feedback queue under {}", 
getName(), brokerID);
 
// do the work 
   

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940875#comment-15940875
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107968567
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940877#comment-15940877
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107967910
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940874#comment-15940874
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107968436
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940876#comment-15940876
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107968935
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -17,100 +17,164 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TODO write javadoc
+ * 
+ * - open a list state per snapshot process
+ * - book-keep snapshot logs
+ * - Clean up state when a savepoint is complete - ONLY in-transit records 
who do NOT belong in other snapshots
+ *
+ * @param 
+ */
 @Internal
-public class StreamIterationHead extends OneInputStreamTask 
{
+public class StreamIterationHead extends OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
private volatile boolean running = true;
 
-   // 

-   
+   private volatile RecordWriterOutput[] outputs;
+
+   private UpstreamLogger upstreamLogger;
+
+   private Object lock;
+
+   @Override
+   public void init() throws Exception {
+   this.lock = getCheckpointLock();
+   getConfiguration().setStreamOperator(new 
UpstreamLogger(getConfiguration()));
+   operatorChain = new OperatorChain<>(this);
+   this.upstreamLogger = (UpstreamLogger) 
operatorChain.getHeadOperator();
--- End diff --

if this is the same UpstreamLogger instance that you pass 2 lines above 
then why not use that? :)


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all 

[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...

2017-03-24 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107968935
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -17,100 +17,164 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TODO write javadoc
+ * 
+ * - open a list state per snapshot process
+ * - book-keep snapshot logs
+ * - Clean up state when a savepoint is complete - ONLY in-transit records 
who do NOT belong in other snapshots
+ *
+ * @param 
+ */
 @Internal
-public class StreamIterationHead extends OneInputStreamTask 
{
+public class StreamIterationHead extends OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
private volatile boolean running = true;
 
-   // 

-   
+   private volatile RecordWriterOutput[] outputs;
+
+   private UpstreamLogger upstreamLogger;
+
+   private Object lock;
+
+   @Override
+   public void init() throws Exception {
+   this.lock = getCheckpointLock();
+   getConfiguration().setStreamOperator(new 
UpstreamLogger(getConfiguration()));
+   operatorChain = new OperatorChain<>(this);
+   this.upstreamLogger = (UpstreamLogger) 
operatorChain.getHeadOperator();
--- End diff --

if this is the same UpstreamLogger instance that you pass 2 lines above 
then why not use that? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...

2017-03-24 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107969496
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -17,100 +17,164 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TODO write javadoc
+ * 
+ * - open a list state per snapshot process
+ * - book-keep snapshot logs
+ * - Clean up state when a savepoint is complete - ONLY in-transit records 
who do NOT belong in other snapshots
+ *
+ * @param 
+ */
 @Internal
-public class StreamIterationHead extends OneInputStreamTask 
{
+public class StreamIterationHead extends OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
private volatile boolean running = true;
 
-   // 

-   
+   private volatile RecordWriterOutput[] outputs;
+
+   private UpstreamLogger upstreamLogger;
+
+   private Object lock;
+
+   @Override
+   public void init() throws Exception {
+   this.lock = getCheckpointLock();
+   getConfiguration().setStreamOperator(new 
UpstreamLogger(getConfiguration()));
+   operatorChain = new OperatorChain<>(this);
+   this.upstreamLogger = (UpstreamLogger) 
operatorChain.getHeadOperator();
+   }
+
@Override
protected void run() throws Exception {
-   
+
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task 
configuration");
}
-   
-   final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId ,
-   
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
-   
+   final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId,
+   getEnvironment().getTaskInfo().getIndexOfThisSubtask());
final long iterationWaitTime = 
getConfiguration().getIterationWaitTime();
final boolean shouldWait = iterationWaitTime > 0;
 
-   final BlockingQueue dataChannel = new 
ArrayBlockingQueue(1);
+   final BlockingQueue> dataChannel
+   = new ArrayBlockingQueue<>(1);
 
// offer the queue for the tail
BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
LOG.info("Iteration head {} added feedback queue under {}", 
getName(), brokerID);
 
// do the work 
try {
-   @SuppressWarnings("unchecked")
-   RecordWriterOutput[] outputs = 
(RecordWriterOutput[]) getStreamOutputs();
+   outputs = (RecordWriterOutput[]) 

[GitHub] flink issue #3609: [FLINK-6073] - Support for SQL inner queries for proctime

2017-03-24 Thread rtudoran
Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3609
  
@fhueske @twalthr @sunjincheng121 @shijinkui @stefanobortoli @hongyuhong

I have made a first implementation draft for supporting inner queries 
mainly when operating on processing time. I would highly appreciate some 
feedback from you to further enhance the approach.

The idea of the implementation is described in 
https://issues.apache.org/jira/browse/FLINK-6073?filter=-2



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...

2017-03-24 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107967910
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() {
+   return 
Collections.emptyListIterator();
+   }
+   };
+   }
+
+   return new 

[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...

2017-03-24 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107968567
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() {
+   return 
Collections.emptyListIterator();
+   }
+   };
+   }
+
+   return new 

[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...

2017-03-24 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1668#discussion_r107967886
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
 * Creates the identification string with which head and tail task find 
the shared blocking
 * queue for the back channel. The identification string is unique per 
parallel head/tail pair
 * per iteration per job.
-* 
-* @param jid The job ID.
-* @param iterationID The id of the iteration in the job.
+*
+* @param jid  The job ID.
+* @param iterationID  The id of the iteration in the job.
 * @param subtaskIndex The parallel subtask number
 * @return The identification string.
 */
public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+   /**
+* An internal operator that solely serves as a state logging facility 
for persisting,
+* partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
+* logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
+* output logs to be uniquely identified and cleared after each 
complete checkpoint.
+* 
+* The design is based on the following assumptions:
+* 
+* - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
+* - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
+* - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
+* gives a singular view of the log.
+* 
+* TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
+*
+* @param 
+*/
+   public static class UpstreamLogger extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private final StreamConfig config;
+
+   private LinkedList> slicedLog = new 
LinkedList<>();
+
+   private UpstreamLogger(StreamConfig config) {
+   this.config = config;
+   }
+
+   public void logRecord(StreamRecord record) throws Exception 
{
+   if (!slicedLog.isEmpty()) {
+   slicedLog.getLast().add(record);
+   }
+   }
+
+   public void createSlice(String sliceID) throws Exception {
+   ListState nextSlice =
+   getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(;
+   slicedLog.addLast(nextSlice);
+   }
+
+   public void discardSlice() {
+   ListState logToEvict = 
slicedLog.pollFirst();
+   logToEvict.clear();
+   }
+
+   public Iterable getReplayLog() throws 
Exception {
+   final List logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+   Collections.sort(logSlices, new Comparator() {
+   @Override
+   public int compare(String o1, String o2) {
+   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+   }
+   });
+
+   final List> wrappedIterators 
= new ArrayList<>();
+   for (String splitID : logSlices) {
+   wrappedIterators.add(getOperatorStateBackend()
+   .getOperatorState(new 
ListStateDescriptor<>(splitID,
+   
config.getTypeSerializerOut(getUserCodeClassloader(.get().iterator());
+   }
+
+   if (wrappedIterators.size() == 0) {
+   return new Iterable() {
+   @Override
+   public Iterator 
iterator() {
+   return 
Collections.emptyListIterator();
+   }
+   };
+   }
+
+   return new 

[GitHub] flink pull request #3609: Inner query implementation model

2017-03-24 Thread rtudoran
GitHub user rtudoran opened a pull request:

https://github.com/apache/flink/pull/3609

Inner query implementation model

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-6073

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3609.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3609


commit e2c9bafa1695a9f602fbfed272916abfacfd3cbe
Author: rtudoran 
Date:   2017-03-24T18:22:55Z

Inner query implementation model




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6184) Buffer metrics can cause NPE

2017-03-24 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-6184:
---

Assignee: Chesnay Schepler

> Buffer metrics can cause NPE
> 
>
> Key: FLINK-6184
> URL: https://issues.apache.org/jira/browse/FLINK-6184
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The Buffer metrics defined in the TaskIOMetricGroup are created when a Task 
> is created. At this time, the bufferPool in the Input gates that the metrics 
> make use of is still null, leading to possible NPEs.
> These metrics should either be created after the required objects are fully 
> initialized or guard this case with null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3590: [FLINK-5654] - Add processing time OVER RANGE BETWEEN x P...

2017-03-24 Thread rtudoran
Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3590
  
@fhueske @sunjincheng121 @hongyuhong @stefanobortoli 

I have run a test to compare the 3 approaches: 
-windows  based  #3550 
-processfunction based with events managed in ValueState[Queue] - this PR
-processfunction based with events managed in MapState[Long,JList]   #3607 

The simple benchmark that I run generates events 1 ms apart (a 5 tuple like 
the one we used in the tests). There are 2 scenarios that I run a simple 
counting over the window contents

Scenario 1)

2 second window (~2000 events in a window) - 100K events  in total generated
Window based solution:  113839 ms
Process based (with Queue): 111792 ms
Process based on MapState: 110533 ms

10 second window (~1 events in a window) - 200K events  in total 
generated
Window based solution:  218399ms
Process based (with Queue): 217343ms
Process based on MapState: 217657ms

I would say that the approaches are similar in performance (with some small 
advantage for ProcessingFunctions). Regarding the 2 approaches for handing data 
in process windows, I would say that the price to pay for 
serializing/deserializing the whole list of events is matched by 
(serializing/deserializing the timestamp keys + independently deserializing the 
events that need to be removed). Considering that the performance are similar 
personally I believe that the approach with Queue is preferred because we can 
actually gain something (i.e., the order of the events) which will be helpful 
in extending the implementation for full SQL



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940830#comment-15940830
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3590
  
@fhueske @sunjincheng121 @hongyuhong @stefanobortoli 

I have run a test to compare the 3 approaches: 
-windows  based  #3550 
-processfunction based with events managed in ValueState[Queue] - this PR
-processfunction based with events managed in MapState[Long,JList]   #3607 

The simple benchmark that I run generates events 1 ms apart (a 5 tuple like 
the one we used in the tests). There are 2 scenarios that I run a simple 
counting over the window contents

Scenario 1)

2 second window (~2000 events in a window) - 100K events  in total generated
Window based solution:  113839 ms
Process based (with Queue): 111792 ms
Process based on MapState: 110533 ms

10 second window (~1 events in a window) - 200K events  in total 
generated
Window based solution:  218399ms
Process based (with Queue): 217343ms
Process based on MapState: 217657ms

I would say that the approaches are similar in performance (with some small 
advantage for ProcessingFunctions). Regarding the 2 approaches for handing data 
in process windows, I would say that the price to pay for 
serializing/deserializing the whole list of events is matched by 
(serializing/deserializing the timestamp keys + independently deserializing the 
events that need to be removed). Considering that the performance are similar 
personally I believe that the approach with Queue is preferred because we can 
actually gain something (i.e., the order of the events) which will be helpful 
in extending the implementation for full SQL



> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940819#comment-15940819
 ] 

ASF GitHub Bot commented on FLINK-5715:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3602
  
Merged in c6a80725053c49dd2064405577291bdc86c82003.


> Asynchronous snapshotting for HeapKeyedStateBackend
> ---
>
> Key: FLINK-5715
> URL: https://issues.apache.org/jira/browse/FLINK-5715
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for 
> many user in productions. Their jobs can not tolerate stopped processing for 
> the time it takes to write gigabytes of data from memory to disk. 
> Asynchronous snapshots would be a solution to this problem. The challenge for 
> the implementation is coming up with a copy-on-write scheme for the in-memory 
> hash maps that build the foundation of this backend. After taking a closer 
> look, this problem is twofold. First, providing CoW semantics for the hashmap 
> itself, as a mutible structure, thereby avoiding costly locking or blocking 
> where possible. Second, CoW for the mutable value objects, e.g. through 
> cloning via serializers.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940820#comment-15940820
 ] 

ASF GitHub Bot commented on FLINK-5715:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3602


> Asynchronous snapshotting for HeapKeyedStateBackend
> ---
>
> Key: FLINK-5715
> URL: https://issues.apache.org/jira/browse/FLINK-5715
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for 
> many user in productions. Their jobs can not tolerate stopped processing for 
> the time it takes to write gigabytes of data from memory to disk. 
> Asynchronous snapshots would be a solution to this problem. The challenge for 
> the implementation is coming up with a copy-on-write scheme for the in-memory 
> hash maps that build the foundation of this backend. After taking a closer 
> look, this problem is twofold. First, providing CoW semantics for the hashmap 
> itself, as a mutible structure, thereby avoiding costly locking or blocking 
> where possible. Second, CoW for the mutable value objects, e.g. through 
> cloning via serializers.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3602: [FLINK-5715] Asynchronous snapshots for heap keyed...

2017-03-24 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3602


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3602: [FLINK-5715] Asynchronous snapshots for heap keyed state ...

2017-03-24 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3602
  
Merged in c6a80725053c49dd2064405577291bdc86c82003.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940809#comment-15940809
 ] 

ASF GitHub Bot commented on FLINK-5715:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3602
  
After a discussion with @StephanEwen , we decided to follow my proposal. 
Merging this now.


> Asynchronous snapshotting for HeapKeyedStateBackend
> ---
>
> Key: FLINK-5715
> URL: https://issues.apache.org/jira/browse/FLINK-5715
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for 
> many user in productions. Their jobs can not tolerate stopped processing for 
> the time it takes to write gigabytes of data from memory to disk. 
> Asynchronous snapshots would be a solution to this problem. The challenge for 
> the implementation is coming up with a copy-on-write scheme for the in-memory 
> hash maps that build the foundation of this backend. After taking a closer 
> look, this problem is twofold. First, providing CoW semantics for the hashmap 
> itself, as a mutible structure, thereby avoiding costly locking or blocking 
> where possible. Second, CoW for the mutable value objects, e.g. through 
> cloning via serializers.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3602: [FLINK-5715] Asynchronous snapshots for heap keyed state ...

2017-03-24 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3602
  
After a discussion with @StephanEwen , we decided to follow my proposal. 
Merging this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5498) Add support for left/right outer joins with non-equality predicates (and 1+ equality predicates)

2017-03-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940804#comment-15940804
 ] 

Fabian Hueske commented on FLINK-5498:
--

Hi [~lincoln.86xy], I thought about this problem and I think I found a 
memory-safe way to address it, i.e., without a {{CoGroupFunction}}. The idea is 
to filter out invalid {{null}} join results in a {{GroupReduceFunction}}. The 
overhead for this another sort, but the operator becomes memory-safe.

I think we should prefer a less-efficient memory-safe implementation if 
possible.

I made a prototype implementation for a LEFT OUTER JOIN (see below) but haven't 
thought about whether it would work for FULL OUTER JOINs as well.

What do you think?

Best, Fabian

{code}
public class OuterJoin {
  
  public static void main(String[] args) throws Exception {

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

Row[] dataOuter = new Row[]{Row.of(1, 100), Row.of(1, 100), Row.of(2, 200), 
Row.of(3, 300), Row.of(4, 400), Row.of(5, 500), Row.of(6, 600), Row.of(6, 600)};
Row[] dataInner = new Row[]{Row.of(1, 10), Row.of(1, 110), Row.of(2, 220), 
Row.of(3, 30), Row.of(4, 40), Row.of(4, 41)};

RowTypeInfo rowType = new RowTypeInfo(
  BasicTypeInfo.INT_TYPE_INFO,
  BasicTypeInfo.INT_TYPE_INFO
);

DataSet outer = env.fromCollection(Arrays.asList(dataOuter), rowType);
DataSet inner = env.fromCollection(Arrays.asList(dataInner), rowType);

DataSet joined = outer
  .leftOuterJoin(inner)
.where(0).equalTo(0) // define join keys
.with(new JoinFunc()) // join function adds flag whether join with null 
or not
  .groupBy(1, 2) // group by all fields of the outer table (partitioning is 
reused)
  .reduceGroup(new NullFilter()); // filter out all null joins if there was 
any matched join

joined.print();
  }

  @FunctionAnnotation.ForwardedFieldsFirst({"f0->f1; f1->f2"})
  @FunctionAnnotation.ForwardedFieldsSecond({"f0->f3"})
  public static class JoinFunc implements JoinFunction, 
ResultTypeQueryable {

@Override
public Row join(Row outer, Row inner) throws Exception {
  if (inner == null) {
return Row.of(true, outer.getField(0), outer.getField(1), null, null);
  } else {
if (((int)outer.getField(1)) > ((int)inner.getField(1))) {
  // remains
  return Row.of(false, outer.getField(0), outer.getField(1), 
inner.getField(0), inner.getField(1));
} else {
  // filtered out
  return Row.of(true, outer.getField(0), outer.getField(1), null, null);
}

  }
}

@Override
public TypeInformation getProducedType() {
  return new RowTypeInfo(
BasicTypeInfo.BOOLEAN_TYPE_INFO, // flag to indicate null
BasicTypeInfo.INT_TYPE_INFO, // first field of outer table
BasicTypeInfo.INT_TYPE_INFO, // second field of outer table
BasicTypeInfo.INT_TYPE_INFO, // first field of inner table
BasicTypeInfo.INT_TYPE_INFO  // second field of inner table
  );
}
  }

  @FunctionAnnotation.ForwardedFields({"f1->f0; f2->f1"})
  public static class NullFilter implements GroupReduceFunction, 
ResultTypeQueryable {

@Override
public void reduce(Iterable rows, Collector out) throws Exception 
{

  boolean needsNull = true;
  int nullCnt = 0;
  Row r = null;

  Iterator rowsIt = rows.iterator();
  while (rowsIt.hasNext()) {

r = rowsIt.next();
boolean isNull = (Boolean) r.getField(0);

if (!isNull) {
  // non nulls are directly forwarded
  out.collect(Row.of(r.getField(1), r.getField(2), r.getField(3), 
r.getField(4)));
  needsNull = false;
} else {
  // nulls are not forwarded but counted. Let's see if there were some 
join matches
  nullCnt++;
}
  }

  if (needsNull) {
// no join matches found. Forward null joins
for (int i = 0; i < nullCnt; i++) {
  out.collect(Row.of(r.getField(1), r.getField(2), null, null));
}
  }
}

@Override
public TypeInformation getProducedType() {
  return new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
  );
}
  }
}
{code}

> Add support for left/right outer joins with non-equality predicates (and 1+ 
> equality predicates)
> 
>
> Key: FLINK-5498
> URL: https://issues.apache.org/jira/browse/FLINK-5498
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: lincoln.lee
>Assignee: 

[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940777#comment-15940777
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey. Any update/opinion/something anyone?
Just a gentle reminder, sorry if this sounds a bit desperate :)



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-03-24 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
Hey. Any update/opinion/something anyone?
Just a gentle reminder, sorry if this sounds a bit desperate :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early

2017-03-24 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6183:

Affects Version/s: 1.2.0

> TaskMetricGroup may not be cleanup when Task.run() is never called or exits 
> early
> -
>
> Key: FLINK-6183
> URL: https://issues.apache.org/jira/browse/FLINK-6183
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>
> The TaskMetricGroup is created when a Task is created. It is cleaned up at 
> the end of Task.run() in the finally block. If however run() is never called 
> due some failure between the creation and the call to run the metric group is 
> never closed. This also means that the JobMetricGroup is never closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6184) Buffer metrics can cause NPE

2017-03-24 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6184:

Affects Version/s: 1.2.0

> Buffer metrics can cause NPE
> 
>
> Key: FLINK-6184
> URL: https://issues.apache.org/jira/browse/FLINK-6184
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The Buffer metrics defined in the TaskIOMetricGroup are created when a Task 
> is created. At this time, the bufferPool in the Input gates that the metrics 
> make use of is still null, leading to possible NPEs.
> These metrics should either be created after the required objects are fully 
> initialized or guard this case with null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early

2017-03-24 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6183:

Component/s: Metrics

> TaskMetricGroup may not be cleanup when Task.run() is never called or exits 
> early
> -
>
> Key: FLINK-6183
> URL: https://issues.apache.org/jira/browse/FLINK-6183
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>
> The TaskMetricGroup is created when a Task is created. It is cleaned up at 
> the end of Task.run() in the finally block. If however run() is never called 
> due some failure between the creation and the call to run the metric group is 
> never closed. This also means that the JobMetricGroup is never closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6184) Buffer metrics can cause NPE

2017-03-24 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-6184:
---

 Summary: Buffer metrics can cause NPE
 Key: FLINK-6184
 URL: https://issues.apache.org/jira/browse/FLINK-6184
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.3.0
Reporter: Chesnay Schepler
Priority: Blocker
 Fix For: 1.3.0


The Buffer metrics defined in the TaskIOMetricGroup are created when a Task is 
created. At this time, the bufferPool in the Input gates that the metrics make 
use of is still null, leading to possible NPEs.

These metrics should either be created after the required objects are fully 
initialized or guard this case with null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early

2017-03-24 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-6183:
---

 Summary: TaskMetricGroup may not be cleanup when Task.run() is 
never called or exits early
 Key: FLINK-6183
 URL: https://issues.apache.org/jira/browse/FLINK-6183
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.3.0
Reporter: Chesnay Schepler
Priority: Blocker


The TaskMetricGroup is created when a Task is created. It is cleaned up at the 
end of Task.run() in the finally block. If however run() is never called due 
some failure between the creation and the call to run the metric group is never 
closed. This also means that the JobMetricGroup is never closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4760) Kafka 09 Consumer failed to initialize state because of corrupted operator state and not able to recover

2017-03-24 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-4760.
-
Resolution: Not A Problem

> Kafka 09 Consumer failed to initialize state because of corrupted operator 
> state and not able to recover
> 
>
> Key: FLINK-4760
> URL: https://issues.apache.org/jira/browse/FLINK-4760
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
>
> java.io.StreamCorruptedException: invalid stream header: 0278
>   at 
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
>   at java.io.ObjectInputStream.(ObjectInputStream.java:299)
>   at 
> org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:79)
>   at 
> org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-24 Thread Razvan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Razvan closed FLINK-6063.
-
Resolution: Not A Problem

It's not an actual issue with the framework just isn't clear dfs MUST be used 
for HA.

> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
> Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg
>
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling 
> all computations and discarding all cached data.
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at 

[GitHub] flink issue #3502: [FLINK-4565] Support for SQL IN operator

2017-03-24 Thread DmytroShkvyra
Github user DmytroShkvyra commented on the issue:

https://github.com/apache/flink/pull/3502
  
@twalthr I have tried run `SubQueryRemoveRule` (register it in 
`FlinkRulesSets`) and so on but it have not called from Flink. I have looked 
thru Calcite docs and they said that calcite rules called accordingly signature 
of their constructors. So we should register it in `FlinkRulesSets.scala`. But 
scala has unpleasant thing - erasure. And I suspect it is root cause of it. 
P.S. Sometime I'm going crazy about travis build timeouts :-(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940716#comment-15940716
 ] 

ASF GitHub Bot commented on FLINK-4565:
---

Github user DmytroShkvyra commented on the issue:

https://github.com/apache/flink/pull/3502
  
@twalthr I have tried run `SubQueryRemoveRule` (register it in 
`FlinkRulesSets`) and so on but it have not called from Flink. I have looked 
thru Calcite docs and they said that calcite rules called accordingly signature 
of their constructors. So we should register it in `FlinkRulesSets.scala`. But 
scala has unpleasant thing - erasure. And I suspect it is root cause of it. 
P.S. Sometime I'm going crazy about travis build timeouts :-(


> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dmytro Shkvyra
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-24 Thread Razvan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940707#comment-15940707
 ] 

Razvan commented on FLINK-6063:
---

Hi, it works with DFS for me though I'd underline it is required for HA more in 
the documentation. Thank you for the help!

> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
> Attachments: flink-conf.yaml, Logs.tar.gz, masters, slaves, zoo.cfg
>
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling 
> all computations and discarding all cached data.
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at 

[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940678#comment-15940678
 ] 

ASF GitHub Bot commented on FLINK-6107:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3567
  
I have no preference for any style of import order. I just wanted to 
mandate some order so that we don't have edit wars when people use different 
IDE settings.

@greghogan Have you tried setting up import check settings that more 
closely match the current flink styl? If we find something that works I'm happy 
to change that.  


> Add custom checkstyle for flink-streaming-java
> --
>
> Key: FLINK-6107
> URL: https://issues.apache.org/jira/browse/FLINK-6107
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> There was some consensus on the ML 
> (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E)
>  that we want to have a more uniform code style. We should start 
> module-by-module and by introducing increasingly stricter rules. We have to 
> be aware of the PR situation and ensure that we have minimal breakage for 
> contributors.
> This issue aims at adding a custom checkstyle.xml for 
> {{flink-streaming-java}} that is based on our current checkstyle.xml but adds 
> these checks for Javadocs:
> {code}
> 
> 
> 
> 
>   
>   
>   
>   
>   
>   
>   
>   
> 
> 
> 
> 
>   
>   
>   
> 
> 
>   
>   
> 
> {code}
> This checks:
>  - Every type has a type-level Javadoc
>  - Proper use of {{}} in Javadocs
>  - First sentence must end with a proper punctuation mark
>  - Proper use (including closing) of HTML tags



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3567: [FLINK-6107] Add custom checkstyle for flink-streaming-ja...

2017-03-24 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3567
  
I have no preference for any style of import order. I just wanted to 
mandate some order so that we don't have edit wars when people use different 
IDE settings.

@greghogan Have you tried setting up import check settings that more 
closely match the current flink styl? If we find something that works I'm happy 
to change that. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3567: [FLINK-6107] Add custom checkstyle for flink-strea...

2017-03-24 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3567#discussion_r107946611
  
--- Diff: tools/maven/strict-checkstyle.xml ---
@@ -0,0 +1,550 @@
+
+
+http://www.puppycrawl.com/dtds/configuration_1_3.dtd;>
+
+
+
+
+
+  
+
+  
+
+  
+
+
+  
+
+  
+
+
+
+
+  
+
+  
+
+
+
+  
+
+  
+
+
+
+  
+
+  
+  
+
+  
+
+  
+  
+
+
+
+  
+  
+
+  
+
+  
+  
+
+  
+
+  
+
+  
+
+  
+  
+
+
+
+
+  
+  
+  
+
+
+
+
+  
+  
+  
--- End diff --

This is the Javadoc for the method that the checks are referring to:
```
/**
 * Returns {@code true} if and only if the system property
 * named by the argument exists and is equal to the string
 * {@code "true"}. (Beginning with version 1.0.2 of the
 * JavaTM platform, the test of
 * this string is case insensitive.) A system property is accessible
 * through {@code getProperty}, a method defined by the
 * {@code System} class.
 * 
 * If there is no property with the specified name, or if the specified
 * name is empty or null, then {@code false} is returned.
 *
 * @param   name   the system property name.
 * @return  the {@code boolean} value of the system property.
 * @throws  SecurityException for the same reasons as
 *  {@link System#getProperty(String) System.getProperty}
 * @see java.lang.System#getProperty(java.lang.String)
 * @see java.lang.System#getProperty(java.lang.String, java.lang.String)
 */
public static boolean getBoolean(String name) {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940675#comment-15940675
 ] 

ASF GitHub Bot commented on FLINK-6107:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3567#discussion_r107946611
  
--- Diff: tools/maven/strict-checkstyle.xml ---
@@ -0,0 +1,550 @@
+
+
+http://www.puppycrawl.com/dtds/configuration_1_3.dtd;>
+
+
+
+
+
+  
+
+  
+
+  
+
+
+  
+
+  
+
+
+
+
+  
+
+  
+
+
+
+  
+
+  
+
+
+
+  
+
+  
+  
+
+  
+
+  
+  
+
+
+
+  
+  
+
+  
+
+  
+  
+
+  
+
+  
+
+  
+
+  
+  
+
+
+
+
+  
+  
+  
+
+
+
+
+  
+  
+  
--- End diff --

This is the Javadoc for the method that the checks are referring to:
```
/**
 * Returns {@code true} if and only if the system property
 * named by the argument exists and is equal to the string
 * {@code "true"}. (Beginning with version 1.0.2 of the
 * JavaTM platform, the test of
 * this string is case insensitive.) A system property is accessible
 * through {@code getProperty}, a method defined by the
 * {@code System} class.
 * 
 * If there is no property with the specified name, or if the specified
 * name is empty or null, then {@code false} is returned.
 *
 * @param   name   the system property name.
 * @return  the {@code boolean} value of the system property.
 * @throws  SecurityException for the same reasons as
 *  {@link System#getProperty(String) System.getProperty}
 * @see java.lang.System#getProperty(java.lang.String)
 * @see java.lang.System#getProperty(java.lang.String, java.lang.String)
 */
public static boolean getBoolean(String name) {
```


> Add custom checkstyle for flink-streaming-java
> --
>
> Key: FLINK-6107
> URL: https://issues.apache.org/jira/browse/FLINK-6107
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> There was some consensus on the ML 
> (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E)
>  that we want to have a more uniform code style. We should start 
> module-by-module and by introducing increasingly stricter rules. We have to 
> be aware of the PR situation and ensure that we have minimal breakage for 
> contributors.
> This issue aims at adding a custom checkstyle.xml for 
> {{flink-streaming-java}} that is based on our current checkstyle.xml but adds 
> these checks for Javadocs:
> {code}
> 
> 
> 
> 
>   
>   
>   
>   
>   
>   
>   
>   
> 
> 
> 
> 
>   
>   
>   
> 
> 
>   
>   
> 
> {code}
> This checks:
>  - Every type has a type-level Javadoc
>  - Proper use of {{}} in Javadocs
>  - First sentence must end with a proper punctuation mark
>  - Proper use (including closing) of HTML tags



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6169) yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error

2017-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940669#comment-15940669
 ] 

ASF GitHub Bot commented on FLINK-6169:
---

GitHub user tedyu opened a pull request:

https://github.com/apache/flink/pull/3608

FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in 
case of error

Stop yarnClient before throwing exception

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3608.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3608


commit f0528de9cc03f603b77d6adcd222ff085967b614
Author: tedyu 
Date:   2017-03-24T16:30:31Z

FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in 
case of error




> yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error
> --
>
> Key: FLINK-6169
> URL: https://issues.apache.org/jira/browse/FLINK-6169
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>Priority: Minor
>
> Here is one example:
> {code}
> if(jobManagerMemoryMb > maxRes.getMemory() ) {
>   failSessionDuringDeployment(yarnClient, yarnApplication);
>   throw new YarnDeploymentException("The cluster does not have the 
> requested resources for the JobManager available!\n"
> + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + 
> jobManagerMemoryMb + "MB. " + NOTE);
> }
> {code}
> yarnClient should be stopped when deployment fails.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3608: FLINK-6169 yarnClient should be stopped in Abstrac...

2017-03-24 Thread tedyu
GitHub user tedyu opened a pull request:

https://github.com/apache/flink/pull/3608

FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in 
case of error

Stop yarnClient before throwing exception

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3608.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3608


commit f0528de9cc03f603b77d6adcd222ff085967b614
Author: tedyu 
Date:   2017-03-24T16:30:31Z

FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in 
case of error




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >