[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946503#comment-15946503
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
@StephanEwen Thanks for the suggestion. I have updated the patch.


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3633: [FLINK-5982] [runtime] Refactor AbstractInvokable and Sta...

2017-03-28 Thread tony810430
Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
@StephanEwen Thanks for the suggestion. I have updated the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946496#comment-15946496
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3607
  
Hi, @rtudoran  When open a PR. we should keep our commits based on the 
latest master branch, so that the PR. only contains the change for this issue 
and will be clearly for other's review.I had check your `Commits` list and 
changes. There were something wrong when you rebase the code. When I rebase 
code, I usually use git command as follow: 
```
  git checkout myBranch
  git remote add upstream https://github.com/apache/flink.git
  git fetch upstream
  git rebase upstream/master
```
Everyone's local environment is different, the above operation is for 
reference only.

Best,
SunJincheng


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3607: [FLINK-5654] - Add processing time OVER RANGE BETWEEN x P...

2017-03-28 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3607
  
Hi, @rtudoran  When open a PR. we should keep our commits based on the 
latest master branch, so that the PR. only contains the change for this issue 
and will be clearly for other's review.I had check your `Commits` list and 
changes. There were something wrong when you rebase the code. When I rebase 
code, I usually use git command as follow: 
```
  git checkout myBranch
  git remote add upstream https://github.com/apache/flink.git
  git fetch upstream
  git rebase upstream/master
```
Everyone's local environment is different, the above operation is for 
reference only.

Best,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6204) Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-28 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946477#comment-15946477
 ] 

shijinkui commented on FLINK-6204:
--

-1.
hi, guys. I want to know  the differance between this PR and 
https://github.com/apache/flink/pull/3386
You have 138 comments, but now rewrite the FLINK-3386. Why not recommend this 
solution at 3386. Do we must waste of time on the same problem?

> Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> ---
>
> Key: FLINK-6204
> URL: https://issues.apache.org/jira/browse/FLINK-6204
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently `event time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to 
> SQL`  implementation  class: ` UnboundedEventTimeOverProcessFunction` use 
> data size uncontrollable memory data structures`sortedTimestamps: 
> util.LinkedList [Long] cache data timestamps and sort timestamps. IMO,It's 
> not a good way, because in the production environment there are millions of 
> window data pre millisecond in our application scenario.So, I want to remove 
> `util.LinkedList [Long] `. Welcome anyone to give me feedback.
> What do you think? [~fhueske] and [~Yuhong_kyo]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3629: [FLINK-5655][table]Add event time OVER RANGE BETWEEN x PR...

2017-03-28 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3629
  
Hi @fhueske thanks a lot for your review. I have updated the PR according 
to your comments. 
There is one thing need your explanation.i.e. :
`Also, I realized that this implementation (other OVER windows are probably 
affected as well) will not discard state if the key space evolves. We should 
add a JIRA to add a configuration parameter to remove state if no row was 
received for a certain amount of time.`
IMO. For event-time case, we use data-driven management state. each 
processing a data will be timely processing of expired data. if the data 
continuously, our state will be promptly processed. If there is no data for a 
long time, the data in the state will not expand. If the next data is not sure 
when the arrival of the case, I think we should not clear the data, because the 
removal of data will lead to the next calculation error. If the state data has 
TTL settings, user can config TTL which can be friendly to clear the state. If 
i understand you correctly you said that the configuration parameters, that is 
the the TTL config, is this correct? If not so, I'm appreciated If you can tell 
me your detailed thoughts.

Thanks,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946474#comment-15946474
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3629
  
Hi @fhueske thanks a lot for your review. I have updated the PR according 
to your comments. 
There is one thing need your explanation.i.e. :
`Also, I realized that this implementation (other OVER windows are probably 
affected as well) will not discard state if the key space evolves. We should 
add a JIRA to add a configuration parameter to remove state if no row was 
received for a certain amount of time.`
IMO. For event-time case, we use data-driven management state. each 
processing a data will be timely processing of expired data. if the data 
continuously, our state will be promptly processed. If there is no data for a 
long time, the data in the state will not expand. If the next data is not sure 
when the arrival of the case, I think we should not clear the data, because the 
removal of data will lead to the next calculation error. If the state data has 
TTL settings, user can config TTL which can be friendly to clear the state. If 
i understand you correctly you said that the configuration parameters, that is 
the the TTL config, is this correct? If not so, I'm appreciated If you can tell 
me your detailed thoughts.

Thanks,
SunJincheng


> Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5655
> URL: https://issues.apache.org/jira/browse/FLINK-5655
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5658)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6210) RocksDB instance should be closed in ListViaMergeSpeedMiniBenchmark

2017-03-28 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-6210:
--
Description: 
rocksDB instance should be closed upon returning from main().


ListViaRangeSpeedMiniBenchmark has similar issue.

  was:
rocksDB instance should be closed upon returning from main().

ListViaRangeSpeedMiniBenchmark has similar issue.


> RocksDB instance should be closed in ListViaMergeSpeedMiniBenchmark
> ---
>
> Key: FLINK-6210
> URL: https://issues.apache.org/jira/browse/FLINK-6210
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> rocksDB instance should be closed upon returning from main().
> ListViaRangeSpeedMiniBenchmark has similar issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6085) flink as micro service

2017-03-28 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945991#comment-15945991
 ] 

Chen Qin edited comment on FLINK-6085 at 3/29/17 3:04 AM:
--

To unlock scenarios like blocking rpc call or async callback, I am currently 
thinking of way to connecting web front directly to pipeline in normal case.  
Use some kind of durable buffer to store requests since last successful 
checkpoint against failure scenario.

I think what we can do at this point is to assume client will do retry after 
connection failure and flink as a micro service maintain at least once 
semantics. So the problem simplified to implement a web front source and 
feedback loop from sink to source & locate pending connection to response.

What do you think [~till.rohrmann] [~tudandan]

 


was (Author: foxss):
To unlock scenarios like blocking rpc call or async callback, I am currently 
thinking of way to get rid of using distributed queue by connecting web front 
directly to pipeline.

I put a bit more thoughts on this topic, exact once seems really hard to 
achieve through rpc source. Same issue as using web front ingestion to 
distributed queue at a matter of fact. Clients can do arbitrary retry within 
long time span.

I think what we can do at this point is to assume client will do retry after 
connection failure and flink as a micro service maintain at least once 
semantics. So the problem simplified to implement a web front source and 
feedback loop from sink to source & locate pending connection to response.

What do you think [~till.rohrmann] [~tudandan]

 

> flink as micro service
> --
>
> Key: FLINK-6085
> URL: https://issues.apache.org/jira/browse/FLINK-6085
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, JobManager
>Reporter: Chen Qin
>Priority: Minor
> Attachments: Untitled document.jpg
>
>
> Track discussion around run flink as a micro service, includes but not 
> limited to 
> - RPC (web service endpoint) source
>   as web service endpoint accept RPC call, ingest to the streaming job(only 
> one)
> - callback mechanism 
> - task assignment should honor deployment group (web tier hosts should be 
> isolated from rest of task assignment)
> https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6082) Support window definition for SQL Queries based on WHERE clause with time condition

2017-03-28 Thread hongyuhong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946417#comment-15946417
 ] 

hongyuhong commented on FLINK-6082:
---

Hi [~fhueske], i think what radu means about using time-condition to define 
windows is mainly the situation like:
{code}
SELECT STREAM o.rowtime, o.productId, o.orderId, s.proctime AS shipTime 
FROM Orders AS o 
JOIN Shipments AS s 
ON o.orderId = s.orderId 
AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; 
{code}
Since we can not hold all records of Orders and shipments, we may need an 
window(just logical concept) to hold the latest records.

> Support window definition for SQL Queries based on WHERE clause with time 
> condition
> ---
>
> Key: FLINK-6082
> URL: https://issues.apache.org/jira/browse/FLINK-6082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>
> Time target: Proc Time
> Calcite documentation refers to query examples where the (time)
> boundaries are defined as condition within the WHERE clause. As Flink
> community targets compatibility with Calcite, it makes sense to support
> the definition of windows via this method as well as corresponding
> aggregation on top of them.
> SQL targeted query examples:
> 
> ```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ 
> timestamp - INTERVAL '1' HOUR AND current\_timestamp```
> General comment:
> 1)  window boundaries are defined as conditions in WHERE clause.
> 2)  For indicating the usage of different stream times, rowtime and
> proctime can be used
> 3)  The boundaries are defined based on special construct provided by
> calcite: current\_timestamp and time operations
> Description:
> 
> The logic of this operator is strictly related to supporting aggregates
> over sliding windows defined with OVER
> ([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653),
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654),
> [FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655),
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658),
> [FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this
> issue the design considered queries where the window is defined with the
> syntax of OVER clause and aggregates are applied over this period. This
> is similar in behavior with the only exception that the window
> boundaries are defined with respect to the WHERE conditions. Besides
> this the logic and the types of aggregates to be supported should be the
> same (sum, count, avg, min, max). Supporting these types of query is
> related to the pie chart problem tackled by calcite.
> Similar as for the OVER windows, the construct should build rolling
> windows (i.e., windows that are triggered and move with every incoming
> event).
> Functionality example
> -
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> `SELECT a, count( * ) FROM stream1 WHERE proctime BETWEEN current_ timestamp 
> - INTERVAL '1' HOUR AND current_timestamp;`
> ||IngestionTime(Event)||  Stream1||   Output||
> |10:00:01 |Id1,10 |Id1,1|
> |10:02:00 |Id2,2  |Id2,2|
> |11:25:00 |Id3,2  |Id3,1|
> |12:03:00 |Id4,15 |Id4,2|
> |12:05:00 |Id5,11 |Id5,3|
> |12:56:00 |Id6,20 |Id6,3|
> |...|
> Implementation option
> -
> Considering that the query follows the same functionality as for the
> aggregates over window, the implementation should follow the same
> implementation as for the OVER clause. Considering that the WHERE
> condition are typically related to timing, this means that in case of
> one unbound boundary the
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658) should be
> used, while for bounded time windows the
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654) design
> should be used.
> The window boundaries will be extracted from the WHERE condition.
> The rule will not be mapped anymore to a LogicalWindow, which means that
> the conversion to this would need to happen from the current
> DataStreamCalc rule. In this sense, a dedicated condition will be added
> such that in case the WHERE clause has time conditions, the operator
> implementation of the Over clause (used in the previous issues) should
> be used.
> ```
> class DataStreamCalcRule
>   
> ---
>   {
>   --- 
> ---
>   
>   def convert(rel: RelNode): RelNode = {
>   val calc: LogicalCalc = rel.asInstanceOf\[LogicalCalc\]
>   val traitSet: RelTraitSet = 

[jira] [Comment Edited] (FLINK-6204) Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-28 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945183#comment-15945183
 ] 

sunjincheng edited comment on FLINK-6204 at 3/29/17 2:02 AM:
-

[~Yuhong_kyo] Actually just like I comment in FLINK-5658,At the tableAPI level, 
I do not recommend using a memory data structure with a non-constant amount of 
data. Using the runtime timer mechanism, you can optimize with the 
framework-timer optimization, TableAPI can take full advantage of the runtime 
mechanism.


was (Author: sunjincheng121):
[~Yuhong_kyo] Actually just like I comment in FLINK-5658,At the tableAPI level, 
I do not recommend using a memory data structure with a non-constant amount of 
data. Using the runtime timer mechanism, you can optimize with the 
framework-timer optimization, TableAPI can take full advantage of the runtime 
mechanism, or even timer merge. 

> Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> ---
>
> Key: FLINK-6204
> URL: https://issues.apache.org/jira/browse/FLINK-6204
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently `event time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to 
> SQL`  implementation  class: ` UnboundedEventTimeOverProcessFunction` use 
> data size uncontrollable memory data structures`sortedTimestamps: 
> util.LinkedList [Long] cache data timestamps and sort timestamps. IMO,It's 
> not a good way, because in the production environment there are millions of 
> window data pre millisecond in our application scenario.So, I want to remove 
> `util.LinkedList [Long] `. Welcome anyone to give me feedback.
> What do you think? [~fhueske] and [~Yuhong_kyo]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946400#comment-15946400
 ] 

ASF GitHub Bot commented on FLINK-6117:
---

Github user zhengcanbin commented on the issue:

https://github.com/apache/flink/pull/3600
  
@StephanEwen @vijikarthi @EronWright @Rucongzhang 
Thanks all! I have changed this default behaviour, now 
`zookeeper.sasl.disable` is `false` by default.


> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...

2017-03-28 Thread zhengcanbin
Github user zhengcanbin commented on the issue:

https://github.com/apache/flink/pull/3600
  
@StephanEwen @vijikarthi @EronWright @Rucongzhang 
Thanks all! I have changed this default behaviour, now 
`zookeeper.sasl.disable` is `false` by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946381#comment-15946381
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3531


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
> Fix For: 1.3.0
>
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946380#comment-15946380
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3531
  
@StefanRRichter Thanks for your work. I will close the PR.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
> Fix For: 1.3.0
>
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for ...

2017-03-28 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3531


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for the sna...

2017-03-28 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3531
  
@StefanRRichter Thanks for your work. I will close the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946336#comment-15946336
 ] 

ASF GitHub Bot commented on FLINK-6117:
---

Github user Rucongzhang commented on the issue:

https://github.com/apache/flink/pull/3600
  
@StephanEwen , yes, now if you want to use kerberos , you need additionally 
to set zookeeper.sasl.disable to false. And I agree with you, the default value 
of  zookeeper.sasl.disable can set to false. Then, If you want to use the 
kerberos, can only set  ZooKeeper login context . Thanks!


> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...

2017-03-28 Thread Rucongzhang
Github user Rucongzhang commented on the issue:

https://github.com/apache/flink/pull/3600
  
@StephanEwen , yes, now if you want to use kerberos , you need additionally 
to set zookeeper.sasl.disable to false. And I agree with you, the default value 
of  zookeeper.sasl.disable can set to false. Then, If you want to use the 
kerberos, can only set  ZooKeeper login context . Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3636: Change to validateOptionalDatePropery method

2017-03-28 Thread tsriharsha
GitHub user tsriharsha opened a pull request:

https://github.com/apache/flink/pull/3636

Change to validateOptionalDatePropery method

Current version of validateoptionaldateproperty will fail because it will 
check for date first and if it passes it will fail to get parsed as a double 
and so it will fail the next step. Please let me know if this is an accurate 
statement. I just made a small change, may not follow to the pertaining code 
format though.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tsriharsha/flink patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3636.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3636


commit e5ce905d140757e58b173dfc9b7f58724a042e6d
Author: tsriharsha 
Date:   2017-03-28T23:57:30Z

Change to validateOptionalDatePropery method

Current version of validateoptionaldateproperty will fail because it will 
check for date first and if it passes it will fail to get parsed as a double 
and so it will fail the next step. Please let me know if this is an accurate 
statement. I just made a small change, may not follow to the pertaining code 
format though.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6210) RocksDB instance should be closed in ListViaMergeSpeedMiniBenchmark

2017-03-28 Thread Ted Yu (JIRA)
Ted Yu created FLINK-6210:
-

 Summary: RocksDB instance should be closed in 
ListViaMergeSpeedMiniBenchmark
 Key: FLINK-6210
 URL: https://issues.apache.org/jira/browse/FLINK-6210
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


rocksDB instance should be closed upon returning from main().

ListViaRangeSpeedMiniBenchmark has similar issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6209) StreamPlanEnvironment always has a parallelism of 1

2017-03-28 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946043#comment-15946043
 ] 

Haohui Mai commented on FLINK-6209:
---

We found that FLINK-5808 removes this snippet of code that causes the bug:

{noformat}
int parallelism = env.getParallelism();
if (parallelism > 0) {
setParallelism(parallelism);
}
{noformat}

[~aljoscha] do you have an idea on why this snippet is removed? Thanks.


> StreamPlanEnvironment always has a parallelism of 1
> ---
>
> Key: FLINK-6209
> URL: https://issues.apache.org/jira/browse/FLINK-6209
> Project: Flink
>  Issue Type: Bug
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> Thanks [~bill.liu8904] for triaging the issue.
> After FLINK-5808 we saw that the Flink jobs that are uploaded through the UI 
> always have a parallelism of 1, even the parallelism is explicitly set via in 
> the UI.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6209) StreamPlanEnvironment always has a parallelism of 1

2017-03-28 Thread Haohui Mai (JIRA)
Haohui Mai created FLINK-6209:
-

 Summary: StreamPlanEnvironment always has a parallelism of 1
 Key: FLINK-6209
 URL: https://issues.apache.org/jira/browse/FLINK-6209
 Project: Flink
  Issue Type: Bug
Reporter: Haohui Mai
Assignee: Haohui Mai


Thanks [~bill.liu8904] for triaging the issue.

After FLINK-5808 we saw that the Flink jobs that are uploaded through the UI 
always have a parallelism of 1, even the parallelism is explicitly set via in 
the UI.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6085) flink as micro service

2017-03-28 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945991#comment-15945991
 ] 

Chen Qin commented on FLINK-6085:
-

To unlock scenarios like blocking rpc call or async callback, I am currently 
thinking of way to get rid of using distributed queue by connecting web front 
directly to pipeline.

I put a bit more thoughts on this topic, exact once seems really hard to 
achieve through rpc source. Same issue as using web front ingestion to 
distributed queue at a matter of fact. Clients can do arbitrary retry within 
long time span.

I think what we can do at this point is to assume client will do retry after 
connection failure and flink as a micro service maintain at least once 
semantics. So the problem simplified to implement a web front source and 
feedback loop from sink to source & locate pending connection to response.

What do you think [~till.rohrmann] [~tudandan]

 

> flink as micro service
> --
>
> Key: FLINK-6085
> URL: https://issues.apache.org/jira/browse/FLINK-6085
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, JobManager
>Reporter: Chen Qin
>Priority: Minor
> Attachments: Untitled document.jpg
>
>
> Track discussion around run flink as a micro service, includes but not 
> limited to 
> - RPC (web service endpoint) source
>   as web service endpoint accept RPC call, ingest to the streaming job(only 
> one)
> - callback mechanism 
> - task assignment should honor deployment group (web tier hosts should be 
> isolated from rest of task assignment)
> https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...

2017-03-28 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/3600
  
The thread is becoming confused as to what the default behavior should be.  
   In brief I think the code should say:
```
public static final ConfigOption ZOOKEEPER_SASL_DISABLE =
key("zookeeper.sasl.disable")
.defaultValue(false);
```

The above would leave SASL _enabled_ by default, as is consistent with 
Flink 1.2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945893#comment-15945893
 ] 

ASF GitHub Bot commented on FLINK-6117:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/3600
  
The thread is becoming confused as to what the default behavior should be.  
   In brief I think the code should say:
```
public static final ConfigOption ZOOKEEPER_SASL_DISABLE =
key("zookeeper.sasl.disable")
.defaultValue(false);
```

The above would leave SASL _enabled_ by default, as is consistent with 
Flink 1.2.0.


> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945700#comment-15945700
 ] 

ASF GitHub Bot commented on FLINK-6183:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3610#discussion_r108503328
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 ---
@@ -80,8 +80,17 @@ public TaskMetricGroup addTask(
taskName,
subtaskIndex,
attemptNumber);
-   tasks.put(executionAttemptID, task);
-   return task;
+   TaskMetricGroup prior = 
tasks.put(executionAttemptID, task);
+   if (prior == null) {
+   return task;
--- End diff --

yes that would work as well.


> TaskMetricGroup may not be cleanup when Task.run() is never called or exits 
> early
> -
>
> Key: FLINK-6183
> URL: https://issues.apache.org/jira/browse/FLINK-6183
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>
> The TaskMetricGroup is created when a Task is created. It is cleaned up at 
> the end of Task.run() in the finally block. If however run() is never called 
> due some failure between the creation and the call to run the metric group is 
> never closed. This also means that the JobMetricGroup is never closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3610: [FLINK-6183]/[FLINK-6184] Prevent some NPE and unc...

2017-03-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3610#discussion_r108503328
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 ---
@@ -80,8 +80,17 @@ public TaskMetricGroup addTask(
taskName,
subtaskIndex,
attemptNumber);
-   tasks.put(executionAttemptID, task);
-   return task;
+   TaskMetricGroup prior = 
tasks.put(executionAttemptID, task);
+   if (prior == null) {
+   return task;
--- End diff --

yes that would work as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6034.
-
   Resolution: Fixed
Fix Version/s: 1.3.0

Implemented in cd5527417a1cae57073a8855c6c3b88c88c780aa

> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
> Fix For: 1.3.0
>
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945649#comment-15945649
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3531
  
Merged in cd5527417a1cae57073a8855c6c3b88c88c780aa. @shixiaogang can you 
please close the PR?


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for the sna...

2017-03-28 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3531
  
Merged in cd5527417a1cae57073a8855c6c3b88c88c780aa. @shixiaogang can you 
please close the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945618#comment-15945618
 ] 

ASF GitHub Bot commented on FLINK-6117:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/3600
  
The default ZK SASL client behavior is to enable SASL client and to be in 
consistent it makes sense for us to leave the default option enabled.


> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...

2017-03-28 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/3600
  
The default ZK SASL client behavior is to enable SASL client and to be in 
consistent it makes sense for us to leave the default option enabled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945613#comment-15945613
 ] 

ASF GitHub Bot commented on FLINK-6117:
---

Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3600#discussion_r108284815
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java ---
@@ -55,6 +55,10 @@
//  ZooKeeper Security Options
// 

 
+   public static final ConfigOption ZOOKEEPER_SASL_DISABLE =
+   key("zookeeper.sasl.disable")
+   .defaultValue(true);
--- End diff --

Can the default value be false (meaning SASL client is always enabled) to 
be in consistent with ZK SASL client module?


> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disabl...

2017-03-28 Thread vijikarthi
Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3600#discussion_r108284815
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java ---
@@ -55,6 +55,10 @@
//  ZooKeeper Security Options
// 

 
+   public static final ConfigOption ZOOKEEPER_SASL_DISABLE =
+   key("zookeeper.sasl.disable")
+   .defaultValue(true);
--- End diff --

Can the default value be false (meaning SASL client is always enabled) to 
be in consistent with ZK SASL client module?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6207) Duplicate type serializers for async snapshots of CopyOnWriteStateTable

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945612#comment-15945612
 ] 

ASF GitHub Bot commented on FLINK-6207:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3634


> Duplicate type serializers for async snapshots of CopyOnWriteStateTable
> ---
>
> Key: FLINK-6207
> URL: https://issues.apache.org/jira/browse/FLINK-6207
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0, 1.2.1
>
>
> {{TypeSerializer}} are used for copy-on-write and the parallel snapshots in 
> the {{CopyOnWriteStateTable}}. For stateful serializers, this can lead to 
> race conditions. Snapshots need to duplicate the serializers before using 
> them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3634: [FLINK-6207] Duplicate TypeSerializers for async s...

2017-03-28 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3634


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6207) Duplicate type serializers for async snapshots of CopyOnWriteStateTable

2017-03-28 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6207.
-
   Resolution: Fixed
Fix Version/s: 1.2.1

fixed in 89866a5 
backported to 1.2 in bb3e26f

> Duplicate type serializers for async snapshots of CopyOnWriteStateTable
> ---
>
> Key: FLINK-6207
> URL: https://issues.apache.org/jira/browse/FLINK-6207
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0, 1.2.1
>
>
> {{TypeSerializer}} are used for copy-on-write and the parallel snapshots in 
> the {{CopyOnWriteStateTable}}. For stateful serializers, this can lead to 
> race conditions. Snapshots need to duplicate the serializers before using 
> them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3635: [FLINK-5913] [gelly] Example drivers

2017-03-28 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3635

[FLINK-5913] [gelly] Example drivers

Replace existing and create new algorithm Driver implementations for each 
of the library methods.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 5913_example_drivers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3635.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3635


commit d5bcb862b709c16cd34b9b037c86ac2584c808d2
Author: Greg Hogan 
Date:   2016-10-26T19:18:50Z

[FLINK-5913] [gelly] Example drivers

Replace existing and create new algorithm Driver implementations for
each of the library methods.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5913) Example drivers

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945600#comment-15945600
 ] 

ASF GitHub Bot commented on FLINK-5913:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3635

[FLINK-5913] [gelly] Example drivers

Replace existing and create new algorithm Driver implementations for each 
of the library methods.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 5913_example_drivers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3635.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3635


commit d5bcb862b709c16cd34b9b037c86ac2584c808d2
Author: Greg Hogan 
Date:   2016-10-26T19:18:50Z

[FLINK-5913] [gelly] Example drivers

Replace existing and create new algorithm Driver implementations for
each of the library methods.




> Example drivers
> ---
>
> Key: FLINK-5913
> URL: https://issues.apache.org/jira/browse/FLINK-5913
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> Replace existing and create new algorithm {{Driver}} implementations for each 
> of the library methods.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945578#comment-15945578
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3633
  
Thanks for this refactoring. I had only a quick look so far, but one 
thought was that this patch could probably be simplified by making 
`triggerCheckpoint()` (and restore / notifyComplete) not abstract in the 
`AbstractInvokable`. Instead let it throw the `UnsupportedOperationException`.

That way, we avoid refactoring all the test classes to implement those 
methods (which would be nice).


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3633: [FLINK-5982] [runtime] Refactor AbstractInvokable and Sta...

2017-03-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3633
  
Thanks for this refactoring. I had only a quick look so far, but one 
thought was that this patch could probably be simplified by making 
`triggerCheckpoint()` (and restore / notifyComplete) not abstract in the 
`AbstractInvokable`. Instead let it throw the `UnsupportedOperationException`.

That way, we avoid refactoring all the test classes to implement those 
methods (which would be nice).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945559#comment-15945559
 ] 

ASF GitHub Bot commented on FLINK-6183:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3610#discussion_r108478998
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 ---
@@ -80,8 +80,17 @@ public TaskMetricGroup addTask(
taskName,
subtaskIndex,
attemptNumber);
-   tasks.put(executionAttemptID, task);
-   return task;
+   TaskMetricGroup prior = 
tasks.put(executionAttemptID, task);
+   if (prior == null) {
+   return task;
--- End diff --

Can you avoid adding `closeLocally()` by simply doing a "contains()" check 
before creating the group?


> TaskMetricGroup may not be cleanup when Task.run() is never called or exits 
> early
> -
>
> Key: FLINK-6183
> URL: https://issues.apache.org/jira/browse/FLINK-6183
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>
> The TaskMetricGroup is created when a Task is created. It is cleaned up at 
> the end of Task.run() in the finally block. If however run() is never called 
> due some failure between the creation and the call to run the metric group is 
> never closed. This also means that the JobMetricGroup is never closed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3610: [FLINK-6183]/[FLINK-6184] Prevent some NPE and unc...

2017-03-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3610#discussion_r108478998
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 ---
@@ -80,8 +80,17 @@ public TaskMetricGroup addTask(
taskName,
subtaskIndex,
attemptNumber);
-   tasks.put(executionAttemptID, task);
-   return task;
+   TaskMetricGroup prior = 
tasks.put(executionAttemptID, task);
+   if (prior == null) {
+   return task;
--- End diff --

Can you avoid adding `closeLocally()` by simply doing a "contains()" check 
before creating the group?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3627: Release 0.4 alpha.0

2017-03-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3627
  
Can you close this PR?
Looks like this is some confusion...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6208) Implement skip till next match strategy

2017-03-28 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-6208:
---

 Summary: Implement skip till next match strategy
 Key: FLINK-6208
 URL: https://issues.apache.org/jira/browse/FLINK-6208
 Project: Flink
  Issue Type: New Feature
  Components: CEP
Reporter: Dawid Wysakowicz
 Fix For: 1.3.0


Right now, we support two strategies (except for looping states):
* skip till any match -> {{followedBy}}
* strict contiguity -> {{next}}

We should also support a strategy that will match only the first occurence of a 
valid pattern.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6207) Duplicate type serializers for async snapshots of CopyOnWriteStateTable

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945535#comment-15945535
 ] 

ASF GitHub Bot commented on FLINK-6207:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3634
  
Thanks for the fast review @tillrohrmann ! I will let the test extend the 
`TestLogger` as suggested and then merge this.


> Duplicate type serializers for async snapshots of CopyOnWriteStateTable
> ---
>
> Key: FLINK-6207
> URL: https://issues.apache.org/jira/browse/FLINK-6207
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> {{TypeSerializer}} are used for copy-on-write and the parallel snapshots in 
> the {{CopyOnWriteStateTable}}. For stateful serializers, this can lead to 
> race conditions. Snapshots need to duplicate the serializers before using 
> them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3634: [FLINK-6207] Duplicate TypeSerializers for async snapshot...

2017-03-28 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3634
  
Thanks for the fast review @tillrohrmann ! I will let the test extend the 
`TestLogger` as suggested and then merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6207) Duplicate type serializers for async snapshots of CopyOnWriteStateTable

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945525#comment-15945525
 ] 

ASF GitHub Bot commented on FLINK-6207:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3634#discussion_r108473910
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
 ---
@@ -380,6 +385,77 @@ public void testCopyOnWriteContracts() {
Assert.assertTrue(originalState5 == stateTable.get(5, 1));
}
 
+   /**
--- End diff --

The `CopyOnWriteStateTableTest` could extend the `TestLogger`. That way we 
get better logging output on Travis.


> Duplicate type serializers for async snapshots of CopyOnWriteStateTable
> ---
>
> Key: FLINK-6207
> URL: https://issues.apache.org/jira/browse/FLINK-6207
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> {{TypeSerializer}} are used for copy-on-write and the parallel snapshots in 
> the {{CopyOnWriteStateTable}}. For stateful serializers, this can lead to 
> race conditions. Snapshots need to duplicate the serializers before using 
> them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3634: [FLINK-6207] Duplicate TypeSerializers for async s...

2017-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3634#discussion_r108473910
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
 ---
@@ -380,6 +385,77 @@ public void testCopyOnWriteContracts() {
Assert.assertTrue(originalState5 == stateTable.get(5, 1));
}
 
+   /**
--- End diff --

The `CopyOnWriteStateTableTest` could extend the `TestLogger`. That way we 
get better logging output on Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108463560
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]](
+"dataState",
+keyTypeInformation,
+valueTypeInformation)
+
+dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+  }
+
+  

[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108467653
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]](
+"dataState",
+keyTypeInformation,
+valueTypeInformation)
+
+dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+  }
+
+  

[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945509#comment-15945509
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108470110
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
--- End diff --

The problem of copying the forwarded fields is not captured by the tests, 
because all rows for the same timestamp are identical.


> Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5655
> URL: https://issues.apache.org/jira/browse/FLINK-5655
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5658)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945515#comment-15945515
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108467310
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new 

[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108470110
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
--- End diff --

The problem of copying the forwarded fields is not captured by the tests, 
because all rows for the same timestamp are identical.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108470934
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((2000L, (2L, 2, "Hello"))),
+  Right(1000L),
+  Left((2000L, (2L, 2, "Hello"))),
+  Left((2000L, (2L, 2, "Hello"))),
+  Left((3000L, (3L, 3, "Hello"))),
+  Right(2000L),
+  Left((4000L, (4L, 4, "Hello"))),
+  Right(3000L),
+  Left((5000L, (5L, 5, "Hello"))),
+  Right(5000L),
+  Left((6000L, (6L, 6, "Hello"))),
+  Right(7000L),
+  Left((8000L, (7L, 7, "Hello World"))),
+  Left((8000L, (7L, 7, "Hello World"))),
+  Left((1L, (7L, 7, "Hello World"))),
+  Right(8000L),
+  Left((12000L, (7L, 7, "Hello World"))),
+  Right(9000L),
+  Left((13000L, (8L, 8, "Hello World"))),
+  Right(11000L),
+  Left((2L, (20L, 20, "Hello World"))),
+  Right(19000L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val t1 = env
+  .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, 
Int, String)](data))
+  .toTable(tEnv).as('a, 'b, 'c)
+
+tEnv.registerTable("T1", t1)
+
+val sqlQuery = "SELECT " +
+  "c, a, " +
+  "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN 
INTERVAL '1' SECOND " +
+  "preceding AND CURRENT ROW)" +
+  ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN 
INTERVAL '1' SECOND " +
+  " preceding AND CURRENT ROW)" +
+  " from T1"
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = mutable.MutableList(
+  "Hello,1,3,3", "Hello,1,3,3", "Hello,1,3,3",
+  "Hello,2,6,9", "Hello,2,6,9","Hello,2,6,9",
+  "Hello,3,4,9",
+  "Hello,4,2,7",
+  "Hello,5,2,9",
+  "Hello,6,2,11",
+  "Hello World,7,2,14", "Hello World,7,2,14", "Hello World,7,1,7", 
"Hello World,7,1,7",
+  "Hello World,8,2,15",
+  "Hello World,20,1,20")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testBoundNonPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
+  Left((1000L, (1L, 1, "Hello"))),
--- End diff --

Please cover more corner cases with the test data (see above).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945517#comment-15945517
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108467898
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new 

[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108453440
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]](
+"dataState",
+keyTypeInformation,
+valueTypeInformation)
+
+dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+  }
+
+  

[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945516#comment-15945516
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108469954
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
--- End diff --

If the window size is 1 second, there should also be some data in between 
(like `1500L`)


> Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5655
> URL: https://issues.apache.org/jira/browse/FLINK-5655
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5658)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945507#comment-15945507
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108463560
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new 

[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945511#comment-15945511
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108453440
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new 

[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945508#comment-15945508
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108455167
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new 

[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945512#comment-15945512
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108467653
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new 

[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108467898
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]](
+"dataState",
+keyTypeInformation,
+valueTypeInformation)
+
+dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+  }
+
+  

[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945513#comment-15945513
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108470691
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
--- End diff --

I think a bit more diverse test data would be good to cover more corner 
cases. 
Basically check that each loop is correctly triggered (retract data of two 
(or more timestamps), emit different rows in one timestamp, etc.


> Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5655
> URL: https://issues.apache.org/jira/browse/FLINK-5655
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5658)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945514#comment-15945514
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108450477
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -91,21 +91,22 @@ object AggregateUtil {
   }
 
   /**
-* Create an 
[[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
+* Create an 
[[org.apache.flink.streaming.api.functions.ProcessFunction]] for
 * bounded OVER window to evaluate final aggregate value.
 *
 * @param namedAggregates List of calls to aggregate functions and 
their output field names
 * @param inputType   Input row type
-* @param inputFields All input fields
 * @param precedingOffset the preceding offset
+* @param isRangeClause   It is a tag that indicates whether the OVER 
clause is rangeClause
 * @param isRowTimeType   It is a tag that indicates whether the time 
type is rowTimeType
 * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
 */
-  private[flink] def createRowsClauseBoundedOverProcessFunction(
+  private[flink] def createBoundedOverProcessFunction(
 namedAggregates: Seq[CalcitePair[AggregateCall, String]],
 inputType: RelDataType,
 precedingOffset: Long,
-isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
+isRangeClause: Boolean = false,
--- End diff --

I don't think we should default values here.


> Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5655
> URL: https://issues.apache.org/jira/browse/FLINK-5655
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5658)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5655) Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945510#comment-15945510
 ] 

ASF GitHub Bot commented on FLINK-5655:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108470934
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((2000L, (2L, 2, "Hello"))),
+  Right(1000L),
+  Left((2000L, (2L, 2, "Hello"))),
+  Left((2000L, (2L, 2, "Hello"))),
+  Left((3000L, (3L, 3, "Hello"))),
+  Right(2000L),
+  Left((4000L, (4L, 4, "Hello"))),
+  Right(3000L),
+  Left((5000L, (5L, 5, "Hello"))),
+  Right(5000L),
+  Left((6000L, (6L, 6, "Hello"))),
+  Right(7000L),
+  Left((8000L, (7L, 7, "Hello World"))),
+  Left((8000L, (7L, 7, "Hello World"))),
+  Left((1L, (7L, 7, "Hello World"))),
+  Right(8000L),
+  Left((12000L, (7L, 7, "Hello World"))),
+  Right(9000L),
+  Left((13000L, (8L, 8, "Hello World"))),
+  Right(11000L),
+  Left((2L, (20L, 20, "Hello World"))),
+  Right(19000L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val t1 = env
+  .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, 
Int, String)](data))
+  .toTable(tEnv).as('a, 'b, 'c)
+
+tEnv.registerTable("T1", t1)
+
+val sqlQuery = "SELECT " +
+  "c, a, " +
+  "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN 
INTERVAL '1' SECOND " +
+  "preceding AND CURRENT ROW)" +
+  ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN 
INTERVAL '1' SECOND " +
+  " preceding AND CURRENT ROW)" +
+  " from T1"
+
+val result = tEnv.sql(sqlQuery).toDataStream[Row]
+result.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = mutable.MutableList(
+  "Hello,1,3,3", "Hello,1,3,3", "Hello,1,3,3",
+  "Hello,2,6,9", "Hello,2,6,9","Hello,2,6,9",
+  "Hello,3,4,9",
+  "Hello,4,2,7",
+  "Hello,5,2,9",
+  "Hello,6,2,11",
+  "Hello World,7,2,14", "Hello World,7,2,14", "Hello World,7,1,7", 
"Hello World,7,1,7",
+  "Hello World,8,2,15",
+  "Hello World,20,1,20")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testBoundNonPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
+  Left((1000L, (1L, 1, "Hello"))),
--- End diff --

Please cover more corner cases with the test data (see above).


> Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5655
> URL: https://issues.apache.org/jira/browse/FLINK-5655
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5658)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can 

[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108467310
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]](
+"dataState",
+keyTypeInformation,
+valueTypeInformation)
+
+dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+  }
+
+  

[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108469954
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
+  Left((1000L, (1L, 1, "Hello"))),
--- End diff --

If the window size is 1 second, there should also be some data in between 
(like `1500L`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108470691
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
+val data = Seq(
--- End diff --

I think a bit more diverse test data would be good to cover more corner 
cases. 
Basically check that each loop is correctly triggered (retract data of two 
(or more timestamps), emit different rows in one timestamp, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108455167
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates   the list of all [[AggregateFunction]] used 
for this aggregation
+ * @param aggFieldsthe position (in the input Row) of the 
input value for each aggregate
+ * @param forwardedFieldCount  the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset  the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val aggregationStateType: RowTypeInfo,
+private val inputRowType: RowTypeInfo,
+private val precedingOffset: Long)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkNotNull(forwardedFieldCount)
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental 
calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time 
stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data 
of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+
+val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+  new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
+lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+val accumulatorStateDescriptor =
+  new ValueStateDescriptor[Row]("accumulatorState", 
aggregationStateType)
+accumulatorState = 
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+val keyTypeInformation: TypeInformation[Long] =
+  BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](inputRowType)
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]](
+"dataState",
+keyTypeInformation,
+valueTypeInformation)
+
+dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+  }
+
+  

[GitHub] flink pull request #3629: [FLINK-5655][table]Add event time OVER RANGE BETWE...

2017-03-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3629#discussion_r108450477
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -91,21 +91,22 @@ object AggregateUtil {
   }
 
   /**
-* Create an 
[[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
+* Create an 
[[org.apache.flink.streaming.api.functions.ProcessFunction]] for
 * bounded OVER window to evaluate final aggregate value.
 *
 * @param namedAggregates List of calls to aggregate functions and 
their output field names
 * @param inputType   Input row type
-* @param inputFields All input fields
 * @param precedingOffset the preceding offset
+* @param isRangeClause   It is a tag that indicates whether the OVER 
clause is rangeClause
 * @param isRowTimeType   It is a tag that indicates whether the time 
type is rowTimeType
 * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
 */
-  private[flink] def createRowsClauseBoundedOverProcessFunction(
+  private[flink] def createBoundedOverProcessFunction(
 namedAggregates: Seq[CalcitePair[AggregateCall, String]],
 inputType: RelDataType,
 precedingOffset: Long,
-isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
+isRangeClause: Boolean = false,
--- End diff --

I don't think we should default values here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6081) Offset/Fetch support for SQL Streaming

2017-03-28 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945497#comment-15945497
 ] 

radu commented on FLINK-6081:
-

After checking with Calcite parser option the query examples need to be 
consider the state of group options available. Therefore queries that can be 
parsed will look like

SELECT A4
FROM T1
   GROUP BY A4,A2
   ORDER BY A2 OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY

> Offset/Fetch support for SQL Streaming
> --
>
> Key: FLINK-6081
> URL: https://issues.apache.org/jira/browse/FLINK-6081
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: radu
> Attachments: offset.png
>
>
> Time target: Proc Time
> The main scope of Offset/Fetch is for pagination support. In the context
> of streaming Offset and Fetch would make sense within the scope of
> certain window constructs as they refer to buffered data from the stream
> (with a main usage to restrict the output that is shown at a certain
> moment). Therefore they should be applied to the output of the types of
> windows supported by the ORDER BY clauses. Moreover, in accordance to
> the SQL best practices, they can only be used with an ORDER BY clause.
> SQL targeted query examples:
> 
> Window defined based on group by clause
> ```Q1: SELECT a ORDER BY b OFFSET n ROWS FROM stream1 GROUP BY HOP(proctime, 
> INTERVAL '1' HOUR, INTERVAL '3' HOUR) ```
> Window defined based on where clause time boundaries
> ```Q2: SELECT a ORDER BY b OFFSET n WHERE procTime() BETWEEN 
> current\_timestamp - INTERVAL '1' HOUR AND current\_timestamp FROM stream1 ```
> ~~Window defined as sliding windows (aggregates) ~~
> ``` Q3: ~~SELECT SUM(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR 
> PRECEDING b OFFSET n ROWS) FROM stream1~~ ```
> Comment: Supporting offset over sliding windows (within the window) does
> not make sense because the main scope of OFFSET/FETCH is for pagination
> support. Therefore this functionality example should only be supported in 
> relation to the
> output of a query. Hence, Q3 will not be supported
> The general grammar (Calcite version) for OFFSET/FECTH with available
> parameters is shown below:
> ```
> Select […]
> [ ORDER BY orderItem [, orderItem ]* ]
> [ OFFSET start { ROW | ROWS } ]
> [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ]
> ```
> Description
> ---
> Offset and Fetch are primary used for pagination support (i.e., restrict
> the output that is shown at some point). They were mainly designed to
> support web page display of the contents. Building on this scenario we
> can imagine a similar role for OFFSET and FETCH for streams that would
> display contents via a web page. In such a scenario the number of
> outputs to be displayed would be limited using such operators (probably
> for pagination and aesthetic reasons). However, as for any stream
> application there is a natural evolution in time, the operators output
> should evolve with the update rate of the application. The fact that
> there is an update rate and a collection of events related to a stream
> points to window constructs. Therefore the OFFSET/FETCH functionality
> would be related to the window mechanisms/boundaries defined by the
> query. Hence when the window construct would be re-triggered the output
> would be filtered again from the cardinality point of view based on the
> logic of the OFFSET/FETCH.
> Because of the primary reasons of supporting pagination (and controlling
> the number of outputs) we limit the usage of OFFSET/Fetch for window
> constructs that would be related to the output. Because of this
> supporting those on sliding window with query aggregates (e.g., Q3 query
> example) would not make sense. Additionally there is an implicit need
> for some ordering clause due to the fact that OFFSET and FETCH point to
> ordering positions. That is why these functions would be supported only
> if an ORDER BY clause is present.
> Functionality example
> -
> We exemplify the usage of OFFSET below using the following query. Event
> schema is in the form (a,b).
> ``` SELECT a ORDER BY b OFFSET 2 ROWS FROM stream1 GROUP BY GROUP BY 
> CEIL(proctime TO HOUR) ```
> ||Proctime||  IngestionTime(Event)||  Stream1||   Output||
> | |10:00:01|  (a1, 7)| |  
> | |10:05:00|  (c1, 2)| |  
> | |10:12:00|  (b1,5)| |   
> | |10:50:00|  (d1,2)| |   
> |10-11|   |   |b1,a1|
> | |11:03:00|  (a2,10)|| 
> |11-12|   |   |nil|
> |...|
> Implementation option
> -
> There are 2 options to implement the logic of OFFSET/Fetch:
> 1)  Within the logic of the window (i.e. sorting window)
> Similar as for sorting support (ORDER BY clause), considering that the
> SQL operators will be associated with window 

[jira] [Commented] (FLINK-6034) Add KeyedStateHandle for the snapshots in keyed streams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945473#comment-15945473
 ] 

ASF GitHub Bot commented on FLINK-6034:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3531
  
Thanks for this very nice contribution @shixiaogang! I will merge this now.


> Add KeyedStateHandle for the snapshots in keyed streams
> ---
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3531: [FLINK-6034][checkpoint] Add KeyedStateHandle for the sna...

2017-03-28 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3531
  
Thanks for this very nice contribution @shixiaogang! I will merge this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6207) Duplicate type serializers for async snapshots of CopyOnWriteStateTable

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945465#comment-15945465
 ] 

ASF GitHub Bot commented on FLINK-6207:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3634
  
CC @StephanEwen 


> Duplicate type serializers for async snapshots of CopyOnWriteStateTable
> ---
>
> Key: FLINK-6207
> URL: https://issues.apache.org/jira/browse/FLINK-6207
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> {{TypeSerializer}} are used for copy-on-write and the parallel snapshots in 
> the {{CopyOnWriteStateTable}}. For stateful serializers, this can lead to 
> race conditions. Snapshots need to duplicate the serializers before using 
> them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6207) Duplicate type serializers for async snapshots of CopyOnWriteStateTable

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945463#comment-15945463
 ] 

ASF GitHub Bot commented on FLINK-6207:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/3634

[FLINK-6207] Duplicate TypeSerializers for async snapshots of CopyOnW…

This PR fixes a problem for stateful serializers for async snapshots in the 
`CopyOnWriteStateTable`. Serializers must be duplicated before they are used in 
the async snapshot thread.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink fix-serialier-duplicate

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3634.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3634


commit 63f661067aa90a4b53161fb1bf4e84fcf54b9c10
Author: Stefan Richter 
Date:   2017-03-28T14:56:27Z

[FLINK-6207] Duplicate TypeSerializers for async snapshots of 
CopyOnWriteStateTable




> Duplicate type serializers for async snapshots of CopyOnWriteStateTable
> ---
>
> Key: FLINK-6207
> URL: https://issues.apache.org/jira/browse/FLINK-6207
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> {{TypeSerializer}} are used for copy-on-write and the parallel snapshots in 
> the {{CopyOnWriteStateTable}}. For stateful serializers, this can lead to 
> race conditions. Snapshots need to duplicate the serializers before using 
> them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3634: [FLINK-6207] Duplicate TypeSerializers for async snapshot...

2017-03-28 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3634
  
CC @StephanEwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3634: [FLINK-6207] Duplicate TypeSerializers for async s...

2017-03-28 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/3634

[FLINK-6207] Duplicate TypeSerializers for async snapshots of CopyOnW…

This PR fixes a problem for stateful serializers for async snapshots in the 
`CopyOnWriteStateTable`. Serializers must be duplicated before they are used in 
the async snapshot thread.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink fix-serialier-duplicate

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3634.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3634


commit 63f661067aa90a4b53161fb1bf4e84fcf54b9c10
Author: Stefan Richter 
Date:   2017-03-28T14:56:27Z

[FLINK-6207] Duplicate TypeSerializers for async snapshots of 
CopyOnWriteStateTable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6207) Duplicate type serializers for async snapshots of CopyOnWriteStateTable

2017-03-28 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-6207:
-

 Summary: Duplicate type serializers for async snapshots of 
CopyOnWriteStateTable
 Key: FLINK-6207
 URL: https://issues.apache.org/jira/browse/FLINK-6207
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.3.0


{{TypeSerializer}} are used for copy-on-write and the parallel snapshots in the 
{{CopyOnWriteStateTable}}. For stateful serializers, this can lead to race 
conditions. Snapshots need to duplicate the serializers before using them.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6197) Add support for iterative conditions.

2017-03-28 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-6197.
-
Resolution: Fixed

> Add support for iterative conditions.
> -
>
> Key: FLINK-6197
> URL: https://issues.apache.org/jira/browse/FLINK-6197
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> So far, the {{where}} clause only supported simple {{FilterFunction}} 
> conditions. 
> With this, we want to add support for conditions where the an event is 
> accepted not only based on its own properties, e.g. name, as it was before, 
> but also based on some statistic computed over previously accepted events in 
> the pattern, e.g. if the price is higher than the average of the prices of 
> the previously accepted events. 
> This in combination with the recently added quantifiers will allow for a lot 
> more expressive patterns.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6197) Add support for iterative conditions.

2017-03-28 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945452#comment-15945452
 ] 

Kostas Kloudas commented on FLINK-6197:
---

Merged with ad21a441434b9ac5886b664871553bf57885e984

> Add support for iterative conditions.
> -
>
> Key: FLINK-6197
> URL: https://issues.apache.org/jira/browse/FLINK-6197
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> So far, the {{where}} clause only supported simple {{FilterFunction}} 
> conditions. 
> With this, we want to add support for conditions where the an event is 
> accepted not only based on its own properties, e.g. name, as it was before, 
> but also based on some statistic computed over previously accepted events in 
> the pattern, e.g. if the price is higher than the average of the prices of 
> the previously accepted events. 
> This in combination with the recently added quantifiers will allow for a lot 
> more expressive patterns.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3624: [FLINK-6197] [cep] Add support for iterative condi...

2017-03-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3624


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6197) Add support for iterative conditions.

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945446#comment-15945446
 ] 

ASF GitHub Bot commented on FLINK-6197:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3624


> Add support for iterative conditions.
> -
>
> Key: FLINK-6197
> URL: https://issues.apache.org/jira/browse/FLINK-6197
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> So far, the {{where}} clause only supported simple {{FilterFunction}} 
> conditions. 
> With this, we want to add support for conditions where the an event is 
> accepted not only based on its own properties, e.g. name, as it was before, 
> but also based on some statistic computed over previously accepted events in 
> the pattern, e.g. if the price is higher than the average of the prices of 
> the previously accepted events. 
> This in combination with the recently added quantifiers will allow for a lot 
> more expressive patterns.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6176) Add JARs to CLASSPATH deterministically

2017-03-28 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-6176:
--
Fix Version/s: 1.3.0

> Add JARs to CLASSPATH deterministically
> ---
>
> Key: FLINK-6176
> URL: https://issues.apache.org/jira/browse/FLINK-6176
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> The {{config.sh}} script uses the following shell-script function to build 
> the {{FLINK_CLASSPATH}} variable from a listing of JAR files in the 
> {{$FLINK_LIB_DIR}} directory:
> {code}
> constructFlinkClassPath() {
> while read -d '' -r jarfile ; do
> if [[ $FLINK_CLASSPATH = "" ]]; then
> FLINK_CLASSPATH="$jarfile";
> else
> FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
> fi
> done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
> echo $FLINK_CLASSPATH
> }
> {code}
> The {{find}} command as specified will return files in directory-order, which 
> varies by OS and filesystem.
> The inconsistent ordering of directory contents caused problems for me when 
> installing a Flink Docker image onto new machine with a newer version of 
> Docker and different filesystem (UFS). The differences in the Docker 
> filesystem implementation led to different ordering of the directory 
> contents; this affected the {{FLINK_CLASSPATH}} ordering and generated very 
> puzzling {{NoClassNotFoundException}} errors when running my Flink 
> application.
> This should be addressed by deterministically ordering JAR files added to the 
> {{FLINK_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6206) As an Engineer, I want task state transition log to be warn/error for FAILURE scenarios

2017-03-28 Thread Dan Bress (JIRA)
Dan Bress created FLINK-6206:


 Summary: As an Engineer, I want task state transition log to be 
warn/error for FAILURE scenarios
 Key: FLINK-6206
 URL: https://issues.apache.org/jira/browse/FLINK-6206
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.2.0
Reporter: Dan Bress
Priority: Critical


If a task fails due to an exception, I would like that to be logged at a warn 
or an error level.  currently its info

{code}
private boolean transitionState(ExecutionState currentState, ExecutionState 
newState, Throwable cause) {
if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
if (cause == null) {
LOG.info("{} ({}) switched from {} to {}.", 
taskNameWithSubtask, executionId, currentState, newState);
} else {
LOG.info("{} ({}) switched from {} to {}.", 
taskNameWithSubtask, executionId, currentState, newState, cause);
}

return true;
} else {
return false;
}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945233#comment-15945233
 ] 

ASF GitHub Bot commented on FLINK-5903:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3408
  
@tillrohrmann I still cannot get your point entirely. don't this three 
configs(`-s/-ys`, `yarn.containers.vcores` and `taskmanager.numberOfTaskSlots` 
mean same thing? Do they have difference in usage except priority?


> taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in 
> YARN mode
> ---
>
> Key: FLINK-5903
> URL: https://issues.apache.org/jira/browse/FLINK-5903
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: Tao Wang
>Assignee: Tao Wang
> Attachments: set taskmanager.numberOfTaskSlots to 6.JPG, set 
> yarn.container.vcores to 5_JM.JPG, set yarn.container.vcores to 5_RM.JPG
>
>
> Now Flink did not respect taskmanager.numberOfTaskSlots and 
> yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI.
> Details is that taskmanager.numberOfTaskSlots is not working in anyway 
> andyarn.containers.vcores is only used in requesting container(TM) resources 
> but not aware to TM, which means TM will always think it has default(1) Slots 
> if -s is not configured.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3408: [FLINK-5903][YARN]respect taskmanager.numberOfTaskSlots a...

2017-03-28 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3408
  
@tillrohrmann I still cannot get your point entirely. don't this three 
configs(`-s/-ys`, `yarn.containers.vcores` and `taskmanager.numberOfTaskSlots` 
mean same thing? Do they have difference in usage except priority?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6055) Supported setting timers on a Non-Keyed Stream

2017-03-28 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945191#comment-15945191
 ] 

sunjincheng commented on FLINK-6055:


In the OVER scene, we use NullByteKeySelector to convert non-keyedStream to 
keyed-stream, and I will close the issue.

> Supported setting timers on a Non-Keyed Stream
> --
>
> Key: FLINK-6055
> URL: https://issues.apache.org/jira/browse/FLINK-6055
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> After [FLINK-4460] Allow ProcessFunction on non-keyed streams, I want 
> supported setting timers on a Non-Keyed Stream. What do you think? 
> [~aljoscha] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6204) Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-28 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945183#comment-15945183
 ] 

sunjincheng commented on FLINK-6204:


[~Yuhong_kyo] Actually just like I comment in FLINK-5658,At the tableAPI level, 
I do not recommend using a memory data structure with a non-constant amount of 
data. Using the runtime timer mechanism, you can optimize with the 
framework-timer optimization, TableAPI can take full advantage of the runtime 
mechanism, or even timer merge. 

> Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> ---
>
> Key: FLINK-6204
> URL: https://issues.apache.org/jira/browse/FLINK-6204
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently `event time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to 
> SQL`  implementation  class: ` UnboundedEventTimeOverProcessFunction` use 
> data size uncontrollable memory data structures`sortedTimestamps: 
> util.LinkedList [Long] cache data timestamps and sort timestamps. IMO,It's 
> not a good way, because in the production environment there are millions of 
> window data pre millisecond in our application scenario.So, I want to remove 
> `util.LinkedList [Long] `. Welcome anyone to give me feedback.
> What do you think? [~fhueske] and [~Yuhong_kyo]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3633: [FLINK-5982] [runtime] Refactor AbstractInvokable ...

2017-03-28 Thread tony810430
GitHub user tony810430 opened a pull request:

https://github.com/apache/flink/pull/3633

[FLINK-5982] [runtime] Refactor AbstractInvokable and StatefulTask

This PR wants to merge `AbstractInvokable` and `StatefulTask` to eliminate 
"lazy initialization" in `Task`.

The following are what I did in this PR:

1. Merge `AbstractInvokable` and `StatefulTask` and use constructor to 
initialize `Environment` and `TaskStateHandles` in `AbstractInvokable`.
2. Update all subclasses and corresponding tests. For now, batch tasks are 
still not stateful, so I added `IllegalStateException()` in constructor and 
`UnsupportedOperationException()` in the methods for supporting stateful task.
3. Update the behavior of `AbstractInvokable` in `Task`, including 
`loadAndInstantiateInvokable()`, `triggerCheckpointBarrier()` and 
`notifyCheckpointComplete()`.
4. Add tests for modification in `Task` and tests the constructors' 
behavior for all concrete  subclasses which inherit `AbstractInvokable`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tony810430/flink FLINK-5982

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3633.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3633


commit 31748905eaa3b4897f0b02473526b7fa05c2304e
Author: Tony Wei 
Date:   2017-03-15T03:13:41Z

[FLINK-5982] Refactor AbstractInvokable and StatefulTask




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945161#comment-15945161
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

GitHub user tony810430 opened a pull request:

https://github.com/apache/flink/pull/3633

[FLINK-5982] [runtime] Refactor AbstractInvokable and StatefulTask

This PR wants to merge `AbstractInvokable` and `StatefulTask` to eliminate 
"lazy initialization" in `Task`.

The following are what I did in this PR:

1. Merge `AbstractInvokable` and `StatefulTask` and use constructor to 
initialize `Environment` and `TaskStateHandles` in `AbstractInvokable`.
2. Update all subclasses and corresponding tests. For now, batch tasks are 
still not stateful, so I added `IllegalStateException()` in constructor and 
`UnsupportedOperationException()` in the methods for supporting stateful task.
3. Update the behavior of `AbstractInvokable` in `Task`, including 
`loadAndInstantiateInvokable()`, `triggerCheckpointBarrier()` and 
`notifyCheckpointComplete()`.
4. Add tests for modification in `Task` and tests the constructors' 
behavior for all concrete  subclasses which inherit `AbstractInvokable`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tony810430/flink FLINK-5982

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3633.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3633


commit 31748905eaa3b4897f0b02473526b7fa05c2304e
Author: Tony Wei 
Date:   2017-03-15T03:13:41Z

[FLINK-5982] Refactor AbstractInvokable and StatefulTask




> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6082) Support window definition for SQL Queries based on WHERE clause with time condition

2017-03-28 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945151#comment-15945151
 ] 

radu commented on FLINK-6082:
-

[~fhueske] - Fine for me for the time condition to select the subset of the 
stream. But for me this is mapped to a window  :)...as if the condition is 
between current time and 1 hour before...that is a window. Nevertheless. Coming 
back to finding the right query semantic, what do you think about supporting 
the one you proposed:

SELECT data FROM stream1 WHERE proctime BETWEEN current_ timestamp - INTERVAL 
'1' MINUTE AND current_timestamp
..with the behavior you showed before. I am fine with that.

> Support window definition for SQL Queries based on WHERE clause with time 
> condition
> ---
>
> Key: FLINK-6082
> URL: https://issues.apache.org/jira/browse/FLINK-6082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>
> Time target: Proc Time
> Calcite documentation refers to query examples where the (time)
> boundaries are defined as condition within the WHERE clause. As Flink
> community targets compatibility with Calcite, it makes sense to support
> the definition of windows via this method as well as corresponding
> aggregation on top of them.
> SQL targeted query examples:
> 
> ```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ 
> timestamp - INTERVAL '1' HOUR AND current\_timestamp```
> General comment:
> 1)  window boundaries are defined as conditions in WHERE clause.
> 2)  For indicating the usage of different stream times, rowtime and
> proctime can be used
> 3)  The boundaries are defined based on special construct provided by
> calcite: current\_timestamp and time operations
> Description:
> 
> The logic of this operator is strictly related to supporting aggregates
> over sliding windows defined with OVER
> ([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653),
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654),
> [FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655),
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658),
> [FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this
> issue the design considered queries where the window is defined with the
> syntax of OVER clause and aggregates are applied over this period. This
> is similar in behavior with the only exception that the window
> boundaries are defined with respect to the WHERE conditions. Besides
> this the logic and the types of aggregates to be supported should be the
> same (sum, count, avg, min, max). Supporting these types of query is
> related to the pie chart problem tackled by calcite.
> Similar as for the OVER windows, the construct should build rolling
> windows (i.e., windows that are triggered and move with every incoming
> event).
> Functionality example
> -
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> `SELECT a, count( * ) FROM stream1 WHERE proctime BETWEEN current_ timestamp 
> - INTERVAL '1' HOUR AND current_timestamp;`
> ||IngestionTime(Event)||  Stream1||   Output||
> |10:00:01 |Id1,10 |Id1,1|
> |10:02:00 |Id2,2  |Id2,2|
> |11:25:00 |Id3,2  |Id3,1|
> |12:03:00 |Id4,15 |Id4,2|
> |12:05:00 |Id5,11 |Id5,3|
> |12:56:00 |Id6,20 |Id6,3|
> |...|
> Implementation option
> -
> Considering that the query follows the same functionality as for the
> aggregates over window, the implementation should follow the same
> implementation as for the OVER clause. Considering that the WHERE
> condition are typically related to timing, this means that in case of
> one unbound boundary the
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658) should be
> used, while for bounded time windows the
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654) design
> should be used.
> The window boundaries will be extracted from the WHERE condition.
> The rule will not be mapped anymore to a LogicalWindow, which means that
> the conversion to this would need to happen from the current
> DataStreamCalc rule. In this sense, a dedicated condition will be added
> such that in case the WHERE clause has time conditions, the operator
> implementation of the Over clause (used in the previous issues) should
> be used.
> ```
> class DataStreamCalcRule
>   
> ---
>   {
>   --- 
> ---
>   
>   def convert(rel: RelNode): RelNode = {
>   val calc: LogicalCalc = rel.asInstanceOf\[LogicalCalc\]
>   val 

[jira] [Commented] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945144#comment-15945144
 ] 

ASF GitHub Bot commented on FLINK-6117:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3600
  
I am a bit confused here, this seems to make things more complicated.

Before, if you wanted to use ZK and Kerberos, you only added `zookeeper` to 
`security.kerberos.login.contexts`. Now you need additionally to set 
`zookeeper.sasl.disable` to `false`?

Why don't we keep `zookeeper.sasl.disable` as `false` by default? If I 
understand it correctly, it is anyways only ever relevant when the ZooKeeper 
login context has been enabled...


> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3600: [FLINK-6117]Make setting of 'zookeeper.sasl.disable' work...

2017-03-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3600
  
I am a bit confused here, this seems to make things more complicated.

Before, if you wanted to use ZK and Kerberos, you only added `zookeeper` to 
`security.kerberos.login.contexts`. Now you need additionally to set 
`zookeeper.sasl.disable` to `false`?

Why don't we keep `zookeeper.sasl.disable` as `false` by default? If I 
understand it correctly, it is anyways only ever relevant when the ZooKeeper 
login context has been enabled...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6082) Support window definition for SQL Queries based on WHERE clause with time condition

2017-03-28 Thread radu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945136#comment-15945136
 ] 

radu commented on FLINK-6082:
-

I add also [~Yuhong_kyo] as she works on the join design

> Support window definition for SQL Queries based on WHERE clause with time 
> condition
> ---
>
> Key: FLINK-6082
> URL: https://issues.apache.org/jira/browse/FLINK-6082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>
> Time target: Proc Time
> Calcite documentation refers to query examples where the (time)
> boundaries are defined as condition within the WHERE clause. As Flink
> community targets compatibility with Calcite, it makes sense to support
> the definition of windows via this method as well as corresponding
> aggregation on top of them.
> SQL targeted query examples:
> 
> ```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ 
> timestamp - INTERVAL '1' HOUR AND current\_timestamp```
> General comment:
> 1)  window boundaries are defined as conditions in WHERE clause.
> 2)  For indicating the usage of different stream times, rowtime and
> proctime can be used
> 3)  The boundaries are defined based on special construct provided by
> calcite: current\_timestamp and time operations
> Description:
> 
> The logic of this operator is strictly related to supporting aggregates
> over sliding windows defined with OVER
> ([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653),
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654),
> [FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655),
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658),
> [FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this
> issue the design considered queries where the window is defined with the
> syntax of OVER clause and aggregates are applied over this period. This
> is similar in behavior with the only exception that the window
> boundaries are defined with respect to the WHERE conditions. Besides
> this the logic and the types of aggregates to be supported should be the
> same (sum, count, avg, min, max). Supporting these types of query is
> related to the pie chart problem tackled by calcite.
> Similar as for the OVER windows, the construct should build rolling
> windows (i.e., windows that are triggered and move with every incoming
> event).
> Functionality example
> -
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> `SELECT a, count( * ) FROM stream1 WHERE proctime BETWEEN current_ timestamp 
> - INTERVAL '1' HOUR AND current_timestamp;`
> ||IngestionTime(Event)||  Stream1||   Output||
> |10:00:01 |Id1,10 |Id1,1|
> |10:02:00 |Id2,2  |Id2,2|
> |11:25:00 |Id3,2  |Id3,1|
> |12:03:00 |Id4,15 |Id4,2|
> |12:05:00 |Id5,11 |Id5,3|
> |12:56:00 |Id6,20 |Id6,3|
> |...|
> Implementation option
> -
> Considering that the query follows the same functionality as for the
> aggregates over window, the implementation should follow the same
> implementation as for the OVER clause. Considering that the WHERE
> condition are typically related to timing, this means that in case of
> one unbound boundary the
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658) should be
> used, while for bounded time windows the
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654) design
> should be used.
> The window boundaries will be extracted from the WHERE condition.
> The rule will not be mapped anymore to a LogicalWindow, which means that
> the conversion to this would need to happen from the current
> DataStreamCalc rule. In this sense, a dedicated condition will be added
> such that in case the WHERE clause has time conditions, the operator
> implementation of the Over clause (used in the previous issues) should
> be used.
> ```
> class DataStreamCalcRule
>   
> ---
>   {
>   --- 
> ---
>   
>   def convert(rel: RelNode): RelNode = {
>   val calc: LogicalCalc = rel.asInstanceOf\[LogicalCalc\]
>   val traitSet: RelTraitSet = 
> rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
>   val convInput: RelNode = RelOptRule.convert(calc.getInput, 
> DataStreamConvention.INSTANCE)
>   
>   IF(WHERE contains TIME limits)
>   
>   {
>   
>  >   IF(bounded)
> >   
> >   new DataStreamProcTimeTimeAggregate
> >   
> >   ELSE
> >   
> >   new DataStreamSlideEventTimeRowAgg
> >   
> >   }
> >  
>   

[jira] [Commented] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945134#comment-15945134
 ] 

ASF GitHub Bot commented on FLINK-5903:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3408#discussion_r108417249
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -164,6 +164,12 @@ public AbstractYarnClusterDescriptor() {
throw new RuntimeException("Unable to locate 
configuration file in " + confFile);
}
flinkConfigurationPath = new 
Path(confFile.getAbsolutePath());
+
+   if 
(flinkConfiguration.containsKey(ConfigConstants.YARN_VCORES)) {
+   slots = 
flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, -1);
+   } else if 
(flinkConfiguration.containsKey(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)) {
+   slots = 
flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1);
+   }
--- End diff --

I think we should make the behaviour consistent with Mesos and standalone 
where we set `slots = 
flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)`.


> taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in 
> YARN mode
> ---
>
> Key: FLINK-5903
> URL: https://issues.apache.org/jira/browse/FLINK-5903
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: Tao Wang
>Assignee: Tao Wang
> Attachments: set taskmanager.numberOfTaskSlots to 6.JPG, set 
> yarn.container.vcores to 5_JM.JPG, set yarn.container.vcores to 5_RM.JPG
>
>
> Now Flink did not respect taskmanager.numberOfTaskSlots and 
> yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI.
> Details is that taskmanager.numberOfTaskSlots is not working in anyway 
> andyarn.containers.vcores is only used in requesting container(TM) resources 
> but not aware to TM, which means TM will always think it has default(1) Slots 
> if -s is not configured.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945133#comment-15945133
 ] 

ASF GitHub Bot commented on FLINK-5903:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3408#discussion_r108417410
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
@@ -335,8 +335,7 @@ protected void requestNewWorkers(int numWorkers) {
Priority priority = Priority.newInstance(0);
 
// Resource requirements for worker containers
-   int taskManagerSlots = taskManagerParameters.numSlots();
-   int vcores = 
config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
+   int vcores = Math.max(taskManagerParameters.numSlots(), 
1);
--- End diff --

I think we should still keep the possibility open to configure the `vcores` 
explicitly.


> taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in 
> YARN mode
> ---
>
> Key: FLINK-5903
> URL: https://issues.apache.org/jira/browse/FLINK-5903
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: Tao Wang
>Assignee: Tao Wang
> Attachments: set taskmanager.numberOfTaskSlots to 6.JPG, set 
> yarn.container.vcores to 5_JM.JPG, set yarn.container.vcores to 5_RM.JPG
>
>
> Now Flink did not respect taskmanager.numberOfTaskSlots and 
> yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI.
> Details is that taskmanager.numberOfTaskSlots is not working in anyway 
> andyarn.containers.vcores is only used in requesting container(TM) resources 
> but not aware to TM, which means TM will always think it has default(1) Slots 
> if -s is not configured.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3408: [FLINK-5903][YARN]respect taskmanager.numberOfTask...

2017-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3408#discussion_r108417410
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
@@ -335,8 +335,7 @@ protected void requestNewWorkers(int numWorkers) {
Priority priority = Priority.newInstance(0);
 
// Resource requirements for worker containers
-   int taskManagerSlots = taskManagerParameters.numSlots();
-   int vcores = 
config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
+   int vcores = Math.max(taskManagerParameters.numSlots(), 
1);
--- End diff --

I think we should still keep the possibility open to configure the `vcores` 
explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3408: [FLINK-5903][YARN]respect taskmanager.numberOfTask...

2017-03-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3408#discussion_r108417249
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -164,6 +164,12 @@ public AbstractYarnClusterDescriptor() {
throw new RuntimeException("Unable to locate 
configuration file in " + confFile);
}
flinkConfigurationPath = new 
Path(confFile.getAbsolutePath());
+
+   if 
(flinkConfiguration.containsKey(ConfigConstants.YARN_VCORES)) {
+   slots = 
flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, -1);
+   } else if 
(flinkConfiguration.containsKey(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)) {
+   slots = 
flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1);
+   }
--- End diff --

I think we should make the behaviour consistent with Mesos and standalone 
where we set `slots = 
flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >