[jira] [Updated] (FLINK-5861) TaskManager's components support updating JobManagerConnection

2017-02-20 Thread Biao Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-5861:

Description: Some components in TaskManager, such as TaskManagerActions, 
CheckpointResponder, ResultPartitionConsumableNotifier, 
PartitionProducerStateChecker, need to support updating JobManagerConnection. 
So when JobManager fails and recovers, the tasks who keep old 
JobManagerConnection can be notified to update JobManagerConnection. The tasks 
can continue doing their jobs without failure.

> TaskManager's components support updating JobManagerConnection
> --
>
> Key: FLINK-5861
> URL: https://issues.apache.org/jira/browse/FLINK-5861
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Biao Liu
>Assignee: Biao Liu
>
> Some components in TaskManager, such as TaskManagerActions, 
> CheckpointResponder, ResultPartitionConsumableNotifier, 
> PartitionProducerStateChecker, need to support updating JobManagerConnection. 
> So when JobManager fails and recovers, the tasks who keep old 
> JobManagerConnection can be notified to update JobManagerConnection. The 
> tasks can continue doing their jobs without failure.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5861) TaskManager's components support updating JobManagerConnection

2017-02-20 Thread Biao Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-5861:

Summary: TaskManager's components support updating JobManagerConnection  
(was: TaskManager's components support updating JobManagerGateway)

> TaskManager's components support updating JobManagerConnection
> --
>
> Key: FLINK-5861
> URL: https://issues.apache.org/jira/browse/FLINK-5861
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Biao Liu
>Assignee: Biao Liu
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5861) TaskManager's components support updating JobManagerGateway

2017-02-20 Thread Biao Liu (JIRA)
Biao Liu created FLINK-5861:
---

 Summary: TaskManager's components support updating 
JobManagerGateway
 Key: FLINK-5861
 URL: https://issues.apache.org/jira/browse/FLINK-5861
 Project: Flink
  Issue Type: Sub-task
Reporter: Biao Liu
Assignee: Biao Liu






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-20 Thread Yuhong Hong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875533#comment-15875533
 ] 

Yuhong Hong commented on FLINK-5658:


Hi [~fhueske] & [~wheat9], i saw haohui have already commit the rowtime() 
feature, but still no cover the LogicalWindowRelNode situation, as how to 
distinguish rowtime() and proctime(), i want to do like this:
LogicalProject(with RexOver expr) -> [normalize rule(ProjectToWindow)] 
->CalciteLogicalWindow(input = LogicalProject(with rowtime() func)) -> 
(normalize rule(FlinkLogicalWindowRule)) ->FlinkLogicalWindow(input = 
LogicalProject, isEventtime=true/false, window=CalciteLogicalWindow) 

Cause after normalize period, the rowtime() function will be replaced by 
generator code according to ReduceExpressionsRule, so we can only check whether 
eventtime or proctime before apply ReduceExpressionsRule in normalize period. 
So i want to add an FlinkLogicalWindow which include the CalciteLogicalWindow 
and an isEventtime attribute, and add a additional rule(FlinkLogicalWindowRule) 
to do the transform.

In FlinkLogicalWindowRule, i will check according whether the function operator 
is EventTimeExtractor or ProcTimeExtractor.

What do you think?  or if already have solution, please let me know.

> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5692) Add an Option to Deactivate Kryo Fallback for Serializers

2017-02-20 Thread Jin Mingjian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875518#comment-15875518
 ] 

Jin Mingjian commented on FLINK-5692:
-

[~StephanEwen], I use "forceCustomSerializerCheck" to follow the existed option 
naming style. Let me know if you think some other is better.

> Add an Option to Deactivate Kryo Fallback for Serializers
> -
>
> Key: FLINK-5692
> URL: https://issues.apache.org/jira/browse/FLINK-5692
> Project: Flink
>  Issue Type: New Feature
>  Components: Type Serialization System
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Jin Mingjian
>  Labels: easyfix, starter
>
> Some users want to avoid that Flink's serializers use Kryo, as it can easily 
> become a hotspot in serialization.
> For those users, it would help if there is a flag to "deactive generic 
> types". Those users could then see where types are used that default to Kryo 
> and change these types (make them PoJos, Value types, or write custom 
> serializers).
> There are two ways to approach that:
>   1. (Simple) Make {{GenericTypeInfo}} threw an exception whenever it would 
> create a Kryo Serializer (when the respective flag is set in the 
> {{ExecutionConfig}})
>   2. Have a static flag on the {{TypeExtractor}} to throw an exception 
> whenever it would create a {{GenericTypeInfo}}. This approach has the 
> downside of introducing some static configuration to the TypeExtractor, but 
> may be more helpful because it throws exceptions in the programs at points 
> where the types are used (not where the serializers are created, which may be 
> much later).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3373: [FLINK-5692] [config] Add an Option to Deactivate ...

2017-02-20 Thread jinmingjian
GitHub user jinmingjian opened a pull request:

https://github.com/apache/flink/pull/3373

[FLINK-5692] [config] Add an Option to Deactivate Kryo Fallback for 
Serializers

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinmingjian/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3373


commit 1ff46e53efa2094ac6881b1ca014bf7752277ff2
Author: Jin Mingjian 
Date:   2017-02-21T03:57:21Z

[FLINK-5692] [config] Add an Option to Deactivate Kryo Fallback for 
Serializers




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5692) Add an Option to Deactivate Kryo Fallback for Serializers

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875513#comment-15875513
 ] 

ASF GitHub Bot commented on FLINK-5692:
---

GitHub user jinmingjian opened a pull request:

https://github.com/apache/flink/pull/3373

[FLINK-5692] [config] Add an Option to Deactivate Kryo Fallback for 
Serializers

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinmingjian/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3373


commit 1ff46e53efa2094ac6881b1ca014bf7752277ff2
Author: Jin Mingjian 
Date:   2017-02-21T03:57:21Z

[FLINK-5692] [config] Add an Option to Deactivate Kryo Fallback for 
Serializers




> Add an Option to Deactivate Kryo Fallback for Serializers
> -
>
> Key: FLINK-5692
> URL: https://issues.apache.org/jira/browse/FLINK-5692
> Project: Flink
>  Issue Type: New Feature
>  Components: Type Serialization System
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Jin Mingjian
>  Labels: easyfix, starter
>
> Some users want to avoid that Flink's serializers use Kryo, as it can easily 
> become a hotspot in serialization.
> For those users, it would help if there is a flag to "deactive generic 
> types". Those users could then see where types are used that default to Kryo 
> and change these types (make them PoJos, Value types, or write custom 
> serializers).
> There are two ways to approach that:
>   1. (Simple) Make {{GenericTypeInfo}} threw an exception whenever it would 
> create a Kryo Serializer (when the respective flag is set in the 
> {{ExecutionConfig}})
>   2. Have a static flag on the {{TypeExtractor}} to throw an exception 
> whenever it would create a {{GenericTypeInfo}}. This approach has the 
> downside of introducing some static configuration to the TypeExtractor, but 
> may be more helpful because it throws exceptions in the programs at points 
> where the types are used (not where the serializers are created, which may be 
> much later).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5524) Support early out for code generated conjunctive conditions

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875459#comment-15875459
 ] 

ASF GitHub Bot commented on FLINK-5524:
---

GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/3372

[FLINK-5524] [table] Support early out for code generated AND/OR condition

For condition like a AND b, if the result of a is false, we can save b from 
execution.

For condition like a OR b, if the result of a is true, we can also save b 
from execution.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-5524

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3372.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3372


commit 4f8c07ffc6ac99da67803114edb732e20df793e6
Author: Kurt Young 
Date:   2017-02-21T06:35:17Z

[FLINK-5524] [table] Support early out for code generated AND/OR conditions




> Support early out for code generated conjunctive conditions
> ---
>
> Key: FLINK-5524
> URL: https://issues.apache.org/jira/browse/FLINK-5524
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Currently, all nested conditions for a conjunctive predicate are evaluated 
> before the conjunction is checked.
> A condition like {{(v1 == v2) && (v3 < 5)}} would be compiled into
> {code}
> boolean res1;
> if (v1 == v2) {
>   res1 = true;
> } else {
>   res1 = false;
> }
> boolean res2;
> if (v3 < 5) {
>   res2 = true;
> } else {
>   res2 = false;
> }
> boolean res3;
> if (res1 && res2) {
>   res3 = true;
> } else {
>   res3 = false;
> }
> if (res3) {
>   // emit something
> }
> {code}
> It would be better to leave the generated code as early as possible, e.g., 
> with a {{return}} instead of {{res1 = false}}. The code generator needs a bit 
> of context information for that.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3372: [FLINK-5524] [table] Support early out for code ge...

2017-02-20 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/3372

[FLINK-5524] [table] Support early out for code generated AND/OR condition

For condition like a AND b, if the result of a is false, we can save b from 
execution.

For condition like a OR b, if the result of a is true, we can also save b 
from execution.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-5524

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3372.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3372


commit 4f8c07ffc6ac99da67803114edb732e20df793e6
Author: Kurt Young 
Date:   2017-02-21T06:35:17Z

[FLINK-5524] [table] Support early out for code generated AND/OR conditions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5830) OutOfMemoryError during notify final state in TaskExecutor may cause job stuck

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875444#comment-15875444
 ] 

ASF GitHub Bot commented on FLINK-5830:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3360
  
@StephanEwen , already submit the modifications.


> OutOfMemoryError during notify final state in TaskExecutor may cause job stuck
> --
>
> Key: FLINK-5830
> URL: https://issues.apache.org/jira/browse/FLINK-5830
> Project: Flink
>  Issue Type: Bug
>Reporter: zhijiang
>Assignee: zhijiang
>
> The scenario is like this:
> {{JobMaster}} tries to cancel all the executions when process failed 
> execution, and the task executor already acknowledge the cancel rpc message.
> When notify the final state in {{TaskExecutor}}, it causes OOM in 
> {{AkkaRpcActor}} and this error is caught to log the info. The final state 
> will not be sent any more.
> The {{JobMaster}} can not receive the final state and trigger the restart 
> strategy.
> One solution is to catch the {{OutOfMemoryError}} and throw it, then it will 
> cause to shut down the {{ActorSystem}} resulting in exiting the 
> {{TaskExecutor}}. The {{JobMaster}} can be notified of {{TaskExecutor}} 
> failure and fail all the tasks to trigger restart successfully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-20 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3360
  
@StephanEwen , already submit the modifications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5524) Support early out for code generated conjunctive conditions

2017-02-20 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-5524:
-

Assignee: Kurt Young

> Support early out for code generated conjunctive conditions
> ---
>
> Key: FLINK-5524
> URL: https://issues.apache.org/jira/browse/FLINK-5524
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Currently, all nested conditions for a conjunctive predicate are evaluated 
> before the conjunction is checked.
> A condition like {{(v1 == v2) && (v3 < 5)}} would be compiled into
> {code}
> boolean res1;
> if (v1 == v2) {
>   res1 = true;
> } else {
>   res1 = false;
> }
> boolean res2;
> if (v3 < 5) {
>   res2 = true;
> } else {
>   res2 = false;
> }
> boolean res3;
> if (res1 && res2) {
>   res3 = true;
> } else {
>   res3 = false;
> }
> if (res3) {
>   // emit something
> }
> {code}
> It would be better to leave the generated code as early as possible, e.g., 
> with a {{return}} instead of {{res1 = false}}. The code generator needs a bit 
> of context information for that.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875426#comment-15875426
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102138099
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
 ---
@@ -484,6 +487,71 @@ public static Throwable 
deserializeServerFailure(ByteBuf buf) throws IOException
return null;
}
}
+   
+   /**
+* Serializes all values of the Iterable with the given serializer.
+*
+* @param entries Key-value pairs to serialize
+* @param keySerializer   Serializer for UK
+* @param valueSerializer Serializer for UV
+* @param Type of the keys
+* @param Type of the values
+* @return Serialized values or null if values 
null or empty
+* @throws IOException On failure during serialization
+*/
+   public static  byte[] serializeMap(Iterable> 
entries, TypeSerializer keySerializer, TypeSerializer valueSerializer) 
throws IOException {
+   if (entries != null) {
+   Iterator> it = entries.iterator();
+
+   if (it.hasNext()) {
+   // Serialize
+   DataOutputSerializer dos = new 
DataOutputSerializer(32);
+
+   while (it.hasNext()) {
+   Map.Entry entry = it.next();
+
+   keySerializer.serialize(entry.getKey(), 
dos);
+   
valueSerializer.serialize(entry.getValue(), dos);
+   }
+
+   return dos.getCopyOfBuffer();
+   } else {
+   return null;
--- End diff --

The function is unused now. I will delete it in the update.


> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102138099
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
 ---
@@ -484,6 +487,71 @@ public static Throwable 
deserializeServerFailure(ByteBuf buf) throws IOException
return null;
}
}
+   
+   /**
+* Serializes all values of the Iterable with the given serializer.
+*
+* @param entries Key-value pairs to serialize
+* @param keySerializer   Serializer for UK
+* @param valueSerializer Serializer for UV
+* @param Type of the keys
+* @param Type of the values
+* @return Serialized values or null if values 
null or empty
+* @throws IOException On failure during serialization
+*/
+   public static  byte[] serializeMap(Iterable> 
entries, TypeSerializer keySerializer, TypeSerializer valueSerializer) 
throws IOException {
+   if (entries != null) {
+   Iterator> it = entries.iterator();
+
+   if (it.hasNext()) {
+   // Serialize
+   DataOutputSerializer dos = new 
DataOutputSerializer(32);
+
+   while (it.hasNext()) {
+   Map.Entry entry = it.next();
+
+   keySerializer.serialize(entry.getKey(), 
dos);
+   
valueSerializer.serialize(entry.getValue(), dos);
+   }
+
+   return dos.getCopyOfBuffer();
+   } else {
+   return null;
--- End diff --

The function is unused now. I will delete it in the update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5780) Extend ConfigOption with descriptions

2017-02-20 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875418#comment-15875418
 ] 

shijinkui commented on FLINK-5780:
--

Just sound like a extension of apache common-cli. 
https://commons.apache.org/proper/commons-cli/
IMO, commonk-cli style is the standard, i like it.
Is that so?

> Extend ConfigOption with descriptions
> -
>
> Key: FLINK-5780
> URL: https://issues.apache.org/jira/browse/FLINK-5780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Documentation
>Reporter: Ufuk Celebi
>
> The {{ConfigOption}} type is meant to replace the flat {{ConfigConstants}}. 
> As part of automating the generation of a docs config page we need to extend  
> {{ConfigOption}} with description fields.
> From the ML discussion, these could be:
> {code}
> void shortDescription(String);
> void longDescription(String);
> {code}
> In practice, the description string should contain HTML/Markdown.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875396#comment-15875396
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102135289
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 ---
@@ -410,6 +415,124 @@ public void testDeserializeListTooShort2() throws 
Exception {
KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 
1, 1, 1, 1, 1, 2, 3},
LongSerializer.INSTANCE);
}
+   
+   /**
+* Tests map serialization utils.
+*/
+   @Test
+   public void testMapSerialization() throws Exception {
+   final long key = 0L;
+
+   // objects for heap state list serialisation
+   final HeapKeyedStateBackend longHeapKeyedStateBackend =
+   new HeapKeyedStateBackend<>(
+   mock(TaskKvStateRegistry.class),
+   LongSerializer.INSTANCE,
+   ClassLoader.getSystemClassLoader(),
+   1, new KeyGroupRange(0, 0)
+   );
+   longHeapKeyedStateBackend.setCurrentKey(key);
+
+   final InternalMapState mapState = 
longHeapKeyedStateBackend.createMapState(
+   VoidNamespaceSerializer.INSTANCE,
+   new MapStateDescriptor<>("test", 
LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+   testMapSerialization(key, mapState);
+   }
+
+   /**
+* Verifies that the serialization of a map using the given map state
+* matches the deserialization with {@link 
KvStateRequestSerializer#deserializeList}.
+*
+* @param key
+*  key of the map state
+* @param mapState
+*  map state using the {@link VoidNamespace}, must also be 
a {@link InternalKvState} instance
+*
+* @throws Exception
+*/
+   public static void testMapSerialization(
+   final long key,
+   final InternalMapState 
mapState) throws Exception {
+
+   TypeSerializer userKeySerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer userValueSerializer = 
StringSerializer.INSTANCE;
+   mapState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+   // List
+   final int numElements = 10;
+
+   final Map expectedValues = new HashMap<>();
+   for (int i = 0; i < numElements; i++) {
+   final long value = 
ThreadLocalRandom.current().nextLong();
--- End diff --

I prefer to use `ThreadLocalRandom.current()` which is also used in other 
tests in this file. Though it makes difficult to reproduce the case, it may 
help to find corner cases.


> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102135289
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 ---
@@ -410,6 +415,124 @@ public void testDeserializeListTooShort2() throws 
Exception {
KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 
1, 1, 1, 1, 1, 2, 3},
LongSerializer.INSTANCE);
}
+   
+   /**
+* Tests map serialization utils.
+*/
+   @Test
+   public void testMapSerialization() throws Exception {
+   final long key = 0L;
+
+   // objects for heap state list serialisation
+   final HeapKeyedStateBackend longHeapKeyedStateBackend =
+   new HeapKeyedStateBackend<>(
+   mock(TaskKvStateRegistry.class),
+   LongSerializer.INSTANCE,
+   ClassLoader.getSystemClassLoader(),
+   1, new KeyGroupRange(0, 0)
+   );
+   longHeapKeyedStateBackend.setCurrentKey(key);
+
+   final InternalMapState mapState = 
longHeapKeyedStateBackend.createMapState(
+   VoidNamespaceSerializer.INSTANCE,
+   new MapStateDescriptor<>("test", 
LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+   testMapSerialization(key, mapState);
+   }
+
+   /**
+* Verifies that the serialization of a map using the given map state
+* matches the deserialization with {@link 
KvStateRequestSerializer#deserializeList}.
+*
+* @param key
+*  key of the map state
+* @param mapState
+*  map state using the {@link VoidNamespace}, must also be 
a {@link InternalKvState} instance
+*
+* @throws Exception
+*/
+   public static void testMapSerialization(
+   final long key,
+   final InternalMapState 
mapState) throws Exception {
+
+   TypeSerializer userKeySerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer userValueSerializer = 
StringSerializer.INSTANCE;
+   mapState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+   // List
+   final int numElements = 10;
+
+   final Map expectedValues = new HashMap<>();
+   for (int i = 0; i < numElements; i++) {
+   final long value = 
ThreadLocalRandom.current().nextLong();
--- End diff --

I prefer to use `ThreadLocalRandom.current()` which is also used in other 
tests in this file. Though it makes difficult to reproduce the case, it may 
help to find corner cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5803) Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-20 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875351#comment-15875351
 ] 

sunjincheng commented on FLINK-5803:


Hi [~fhueske] In accordance with the design I made above, I have completed the 
implementation that does not carry procTime(). Can I open a PR. or wait for 
FLINK-5710 ?

> Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING 
> aggregation to SQL
> ---
>
> Key: FLINK-5803
> URL: https://issues.apache.org/jira/browse/FLINK-5803
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5654)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875342#comment-15875342
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102129362
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -834,7 +836,7 @@ private void restoreKVStateData() throws IOException, 
RocksDBException {
}
 
@Override
-   protected  InternalValueState createValueState(
+   public  InternalValueState createValueState(
--- End diff --

It is mainly due to the unit tests in `KvStateRequestSerializerTest` which 
need the accessors to `InternalKvState`.  A better choice to use 
`getPartitionState()` to obtain a user-facing state and convert it to an 
internal state. What do you think?


> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102129362
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -834,7 +836,7 @@ private void restoreKVStateData() throws IOException, 
RocksDBException {
}
 
@Override
-   protected  InternalValueState createValueState(
+   public  InternalValueState createValueState(
--- End diff --

It is mainly due to the unit tests in `KvStateRequestSerializerTest` which 
need the accessors to `InternalKvState`.  A better choice to use 
`getPartitionState()` to obtain a user-facing state and convert it to an 
internal state. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875328#comment-15875328
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102128355
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
 ---
@@ -93,6 +95,18 @@ public DefaultKeyedStateStore(KeyedStateBackend 
keyedStateBackend, ExecutionC
}
}
 
+   @Override
+   public  MapState getMapState(MapStateDescriptor 
stateProperties) {
+   requireNonNull(stateProperties, "The state properties must not 
be null");
+   try {
+   
stateProperties.initializeSerializerUnlessSet(executionConfig);
+   MapState originalState = 
getPartitionedState(stateProperties);
+   return new UserFacingMapState<>(originalState);
+   } catch (Exception e) {
+   throw new RuntimeException("Error while getting state", 
e);
--- End diff --

Currently, `KeyedStateStore#getState()` does not throw exception in its 
declaration. `RuntimeException` is the only exception that can be thrown. Since 
the modification to the interface will affect user code (users will have to 
deal with thrown exceptions), I am not sure it's okay to modify the function 
declaration in `KeyedStateStore`.


> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102128355
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
 ---
@@ -93,6 +95,18 @@ public DefaultKeyedStateStore(KeyedStateBackend 
keyedStateBackend, ExecutionC
}
}
 
+   @Override
+   public  MapState getMapState(MapStateDescriptor 
stateProperties) {
+   requireNonNull(stateProperties, "The state properties must not 
be null");
+   try {
+   
stateProperties.initializeSerializerUnlessSet(executionConfig);
+   MapState originalState = 
getPartitionedState(stateProperties);
+   return new UserFacingMapState<>(originalState);
+   } catch (Exception e) {
+   throw new RuntimeException("Error while getting state", 
e);
--- End diff --

Currently, `KeyedStateStore#getState()` does not throw exception in its 
declaration. `RuntimeException` is the only exception that can be thrown. Since 
the modification to the interface will affect user code (users will have to 
deal with thrown exceptions), I am not sure it's okay to modify the function 
declaration in `KeyedStateStore`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875321#comment-15875321
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102127867
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102127867
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+   byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
+
+   return (rawValueBytes == null ? null : 

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102127767
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+   byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
+
+   return (rawValueBytes == null ? null : 

[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875320#comment-15875320
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102127767
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   

[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875311#comment-15875311
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102126863
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102126863
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+   byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
+
+   return (rawValueBytes == null ? null : 

[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875298#comment-15875298
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102125445
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102125445
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBMapState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state.
+*/
+   public RocksDBMapState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   MapStateDescriptor stateDesc,
+   RocksDBKeyedStateBackend backend) {
+
+   super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+   this.userKeySerializer = stateDesc.getKeySerializer();
+   this.userValueSerializer = stateDesc.getValueSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   // 

+   //  MapState Implementation
+   // 

+
+   @Override
+   public UV get(UK userKey) throws IOException {
+   try {
+   byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+   byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
+
+   return (rawValueBytes == null ? null : 

[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-20 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102125062
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
--- End diff --

To be honest, i have no idea why we can't put `writeOptions` in base class. 
We put it in `AbstractRocksDBState` and  do not come across any problem in our 
production environment. 

Maybe @aljoscha is more familiar with the problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875297#comment-15875297
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3336#discussion_r102125062
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * 
+ * {@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we 
use for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param   The type of the key.
+ * @param   The type of the namespace.
+ * @param  The type of the keys in the map state.
+ * @param  The type of the values in the map state.
+ */
+public class RocksDBMapState
+   extends AbstractRocksDBState, 
MapStateDescriptor, Map>
+   implements InternalMapState {
+
+   /** Serializer for the keys and values */
+   private final TypeSerializer userKeySerializer;
+   private final TypeSerializer userValueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   private final WriteOptions writeOptions;
--- End diff --

To be honest, i have no idea why we can't put `writeOptions` in base class. 
We put it in `AbstractRocksDBState` and  do not come across any problem in our 
production environment. 

Maybe @aljoscha is more familiar with the problem.


> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5414) Bump up Calcite version to 1.11

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875296#comment-15875296
 ] 

ASF GitHub Bot commented on FLINK-5414:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3338#discussion_r102125031
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -155,15 +155,15 @@ class ExpressionReductionTest extends TableTestBase {
   "DataSetCalc",
   batchTableNode(0),
   term("select",
-"13 AS _c0",
+"CAST(13) AS _c0",
--- End diff --

Just played around a little bit. I think the problem is that the advanced 
types are not properly canonized. Using the following diff can pass all tests 
in `ArrayTypeTest`:

```
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -133,12 +133,18 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   override def createTypeWithNullability(
 relDataType: RelDataType,
 nullable: Boolean)
-  : RelDataType = relDataType match {
-case composite: CompositeRelDataType =>
-  // at the moment we do not care about nullability
-  composite
-case _ =>
-  super.createTypeWithNullability(relDataType, nullable)
+  : RelDataType = {
+val t = relDataType match {
+  case composite: CompositeRelDataType =>
+// at the moment we do not care about nullability
+composite
+  case array: ArrayRelDataType =>
+val elementType = 
canonize(createTypeWithNullability(array.getComponentType, nullable))
+new ArrayRelDataType(array.typeInfo, elementType, nullable)
+  case _ =>
+super.createTypeWithNullability(relDataType, nullable)
+}
+canonize(t)
   }
 }
```

GroupWindowTest is still failing as it misses an identity projection. I'm 
wondering why `ProjectRemoveRule.INSTANCE` did not kick in...


> Bump up Calcite version to 1.11
> ---
>
> Key: FLINK-5414
> URL: https://issues.apache.org/jira/browse/FLINK-5414
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> The upcoming Calcite release 1.11 has a lot of stability fixes and new 
> features. We should update it for the Table API.
> E.g. we can hopefully merge FLINK-4864



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2017-02-20 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5860:
-
Description: 
Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
get a  Unit test list. Replace all the file creating from `java.io.tmpdir` with 
TemporaryFolder.

Who can fix this problem thoroughly?

```

$ grep -ri 'System.getProperty("java.io.tmpdir")' .
./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
env.setStateBackend(new FsStateBackend("file:///" + 
System.getProperty("java.io.tmpdir") + "/flink/backend"));
./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
  File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
   File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
   File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
   return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: 
public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
System.getProperty("java.io.tmpdir");
./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
final String tempPath = System.getProperty("java.io.tmpdir");
./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: 
final File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:   
File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:   
File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
final String outDir = params.get("output", 
System.getProperty("java.io.tmpdir"));
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
  final String tmpDir = System.getProperty("java.io.tmpdir");
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
final String outPath = System.getProperty("java.io.tmpdir");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 public static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 FLINK_HDFS_PATH = "file:" + 
System.getProperty("java.io.tmpdir") + File.separator + "flink";
./flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java: 
baseDir = new File(System.getProperty("java.io.tmpdir"));
./flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java:
return System.getProperty("java.io.tmpdir");
./flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java:
 System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString());

[GitHub] flink pull request #3338: [FLINK-5414] [table] Bump up Calcite version to 1....

2017-02-20 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3338#discussion_r102125031
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -155,15 +155,15 @@ class ExpressionReductionTest extends TableTestBase {
   "DataSetCalc",
   batchTableNode(0),
   term("select",
-"13 AS _c0",
+"CAST(13) AS _c0",
--- End diff --

Just played around a little bit. I think the problem is that the advanced 
types are not properly canonized. Using the following diff can pass all tests 
in `ArrayTypeTest`:

```
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -133,12 +133,18 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   override def createTypeWithNullability(
 relDataType: RelDataType,
 nullable: Boolean)
-  : RelDataType = relDataType match {
-case composite: CompositeRelDataType =>
-  // at the moment we do not care about nullability
-  composite
-case _ =>
-  super.createTypeWithNullability(relDataType, nullable)
+  : RelDataType = {
+val t = relDataType match {
+  case composite: CompositeRelDataType =>
+// at the moment we do not care about nullability
+composite
+  case array: ArrayRelDataType =>
+val elementType = 
canonize(createTypeWithNullability(array.getComponentType, nullable))
+new ArrayRelDataType(array.typeInfo, elementType, nullable)
+  case _ =>
+super.createTypeWithNullability(relDataType, nullable)
+}
+canonize(t)
   }
 }
```

GroupWindowTest is still failing as it misses an identity projection. I'm 
wondering why `ProjectRemoveRule.INSTANCE` did not kick in...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2017-02-20 Thread shijinkui (JIRA)
shijinkui created FLINK-5860:


 Summary: Replace all the file creating from java.io.tmpdir with 
TemporaryFolder
 Key: FLINK-5860
 URL: https://issues.apache.org/jira/browse/FLINK-5860
 Project: Flink
  Issue Type: Test
  Components: Tests
Reporter: shijinkui


Search `System.getProperty("java.io.tmpdir")` whole Flink project. It will get 
a  Unit test list. Replace all the file creating from `java.io.tmpdir` with 
TemporaryFolder.

Who can fix this problem thoroughly?

```

$ grep -ri 'System.getProperty("java.io.tmpdir")' .
./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
env.setStateBackend(new FsStateBackend("file:///" + 
System.getProperty("java.io.tmpdir") + "/flink/backend"));
./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
  File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
   File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
   File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
   return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: 
public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
System.getProperty("java.io.tmpdir");
./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
final String tempPath = System.getProperty("java.io.tmpdir");
./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: 
final File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:   
File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:   
File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
final String outDir = params.get("output", 
System.getProperty("java.io.tmpdir"));
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
  final String tmpDir = System.getProperty("java.io.tmpdir");
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
final String outPath = System.getProperty("java.io.tmpdir");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 public static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 FLINK_HDFS_PATH = "file:" + 
System.getProperty("java.io.tmpdir") + File.separator + "flink";
./flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java: 
baseDir = new File(System.getProperty("java.io.tmpdir"));
./flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java:
return System.getProperty("java.io.tmpdir");
./flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java:
  

[jira] [Commented] (FLINK-5830) OutOfMemoryError during notify final state in TaskExecutor may cause job stuck

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875284#comment-15875284
 ] 

ASF GitHub Bot commented on FLINK-5830:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3360
  
@StephanEwen , thank you for so quick reviews! 

That is a good idea to add the uniform way in the utils, so we can use that 
in anywhere.

I will fix it as your suggestions later today.


> OutOfMemoryError during notify final state in TaskExecutor may cause job stuck
> --
>
> Key: FLINK-5830
> URL: https://issues.apache.org/jira/browse/FLINK-5830
> Project: Flink
>  Issue Type: Bug
>Reporter: zhijiang
>Assignee: zhijiang
>
> The scenario is like this:
> {{JobMaster}} tries to cancel all the executions when process failed 
> execution, and the task executor already acknowledge the cancel rpc message.
> When notify the final state in {{TaskExecutor}}, it causes OOM in 
> {{AkkaRpcActor}} and this error is caught to log the info. The final state 
> will not be sent any more.
> The {{JobMaster}} can not receive the final state and trigger the restart 
> strategy.
> One solution is to catch the {{OutOfMemoryError}} and throw it, then it will 
> cause to shut down the {{ActorSystem}} resulting in exiting the 
> {{TaskExecutor}}. The {{JobMaster}} can be notified of {{TaskExecutor}} 
> failure and fail all the tasks to trigger restart successfully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-20 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3360
  
@StephanEwen , thank you for so quick reviews! 

That is a good idea to add the uniform way in the utils, so we can use that 
in anywhere.

I will fix it as your suggestions later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5836) Race condition between slot offering and task deployment

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875280#comment-15875280
 ] 

ASF GitHub Bot commented on FLINK-5836:
---

GitHub user wenlong88 opened a pull request:

https://github.com/apache/flink/pull/3371

[FLINK-5836] Fix race condition between offer slot and submit task

The solution is the same as what till described in jira: activating the 
slots when reserving them on `TaskExecutor` before offering to `JobManager`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wenlong88/flink jira-5836

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3371.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3371


commit 35042f29e055a7f83b7c4d79e4c72673711dfd78
Author: wenlong.lwl 
Date:   2017-01-06T08:32:08Z

Fix race condition between offer slot and submit task




> Race condition between slot offering and task deployment
> 
>
> Key: FLINK-5836
> URL: https://issues.apache.org/jira/browse/FLINK-5836
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Wenlong Lyu
>  Labels: flip-6
>
> The Flip-6 code has a race condition when offering slots to a {{JobManager}} 
> which directly deploys tasks to the offered slots. In such a situation it is 
> possible that the deploy call overtakes the acknowledge message for the slot 
> offering. As a result, the slots are not marked yet as active and the 
> deployment will fail.
> I propose to fix this problem by first activating all offered slots before 
> sending the slot offer message to the {{JobManager}}. Consequently, we'll 
> deactivate and free slots which haven't been accepted by the {{JobManager}} 
> once we've received the offering acknowledge message.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3371: [FLINK-5836] Fix race condition between offer slot...

2017-02-20 Thread wenlong88
GitHub user wenlong88 opened a pull request:

https://github.com/apache/flink/pull/3371

[FLINK-5836] Fix race condition between offer slot and submit task

The solution is the same as what till described in jira: activating the 
slots when reserving them on `TaskExecutor` before offering to `JobManager`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wenlong88/flink jira-5836

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3371.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3371


commit 35042f29e055a7f83b7c4d79e4c72673711dfd78
Author: wenlong.lwl 
Date:   2017-01-06T08:32:08Z

Fix race condition between offer slot and submit task




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-20 Thread godfrey he (JIRA)
godfrey he created FLINK-5859:
-

 Summary: support partition pruning on Table API & SQL
 Key: FLINK-5859
 URL: https://issues.apache.org/jira/browse/FLINK-5859
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: godfrey he
Assignee: godfrey he


Many data sources are partitionable storage, e.g. HDFS, Druid. And many queries 
just need to read a small subset of the total data. We can use partition 
information to prune or skip over files irrelevant to the user’s queries. Both 
query optimization time and execution time can be reduced obviously, especially 
for a large partitioned table.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875277#comment-15875277
 ] 

ASF GitHub Bot commented on FLINK-5546:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
hi, @StephanEwen I have re-submit this pull request base on current master 
branch which had merged FLINK-5817.


> java.io.tmpdir setted as project build directory in surefire plugin
> ---
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
> Fix For: 1.2.1
>
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...

2017-02-20 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
hi, @StephanEwen I have re-submit this pull request base on current master 
branch which had merged FLINK-5817.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5795) Improve “UDTF" to support constructor with parameter.

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875273#comment-15875273
 ] 

ASF GitHub Bot commented on FLINK-5795:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3330
  
Looks good to me. Wait for another committer +1


> Improve “UDTF" to support constructor with parameter.
> -
>
> Key: FLINK-5795
> URL: https://issues.apache.org/jira/browse/FLINK-5795
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3330: [FLINK-5795][TableAPI] Improve UDTF to support constr...

2017-02-20 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3330
  
Looks good to me. Wait for another committer +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5858) Support multiple sinks in same execution DAG

2017-02-20 Thread godfrey he (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-5858:
--
Description: 
When call writeToSink method to write the Table(with TableSource) to a 
TableSink, the Table was translated to DataSet or DataStream, if we call 
writeToSink(write to different sinks) more than once, the Table was also 
translated more than once. The final execution graph was parted to different 
DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val csvTableSource = new CsvTableSource(
  "/tmp/words",
  Array("first", "id", "score", "last"),
  Array(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
  ),
  fieldDelim = "#"
)

tEnv.registerTableSource("csv_source", csvTableSource)
val resultTable = tEnv.scan("csv_source")
  .groupBy('first)
  .select('first, 'score.sum)

resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

println(tEnv.explain(resultTable))
{code}

Results:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
LogicalProject(first=[$0], score=[$2])
  LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED

Stage 5 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 4 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED

Stage 3 : GroupReduce
content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED

Stage 2 : Map
content : to: Row(f0: String, f1: 
Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 1 : Map
content : Map at 
emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : 
RANDOM_PARTITIONED

Stage 0 : Data Sink
content : 
TextOutputFormat (/tmp/wordcount1) - UTF-8
ship_strategy : Forward
exchange_mode : 
PIPELINED
Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED

Stage 12 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 11 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED

Stage 10 : 

[jira] [Updated] (FLINK-5858) Support multiple sinks in same execution DAG

2017-02-20 Thread godfrey he (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-5858:
--
Description: 
When call writeToSink method to write the Table(with TableSource) to a 
TableSink, the Table was translated to DataSet or DataStream, if we call 
writeToSink(write to different sinks) more than once, the Table was also 
translated more than once. The final execution graph was parted to different 
DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val csvTableSource = new CsvTableSource(
  "/tmp/words",
  Array("first", "id", "score", "last"),
  Array(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
  ),
  fieldDelim = "#"
)

tEnv.registerTableSource("csv_source", csvTableSource)
val resultTable = tEnv.scan("csv_source")
  .groupBy('first)
  .select('first, 'score.sum)

resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

println(tEnv.explain(resultTable))
{code}

result:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
LogicalProject(first=[$0], score=[$2])
  LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED

Stage 5 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 4 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED

Stage 3 : GroupReduce
content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED

Stage 2 : Map
content : to: Row(f0: String, f1: 
Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 1 : Map
content : Map at 
emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : 
RANDOM_PARTITIONED

Stage 0 : Data Sink
content : 
TextOutputFormat (/tmp/wordcount1) - UTF-8
ship_strategy : Forward
exchange_mode : 
PIPELINED
Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED

Stage 12 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 11 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED

Stage 10 : 

[jira] [Created] (FLINK-5858) Support multiple sinks in same execution DAG

2017-02-20 Thread godfrey he (JIRA)
godfrey he created FLINK-5858:
-

 Summary: Support multiple sinks in same execution DAG
 Key: FLINK-5858
 URL: https://issues.apache.org/jira/browse/FLINK-5858
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: godfrey he


When call writeToSink method to write the Table(with TableSource) to a 
TableSink, the Table was translated to DataSet or DataStream, if we call 
writeToSink(write to different sinks) more than once, the Table was also 
translated more than once. The final execution graph was parted to different 
DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val csvTableSource = new CsvTableSource(
  "/tmp/words",
  Array("first", "id", "score", "last"),
  Array(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
  ),
  fieldDelim = "#"
)

tEnv.registerTableSource("csv_source", csvTableSource)
val resultTable = tEnv.scan("csv_source")
  .groupBy('first)
  .select('first, 'score.sum)

resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

println(tEnv.explain(resultTable))
{code}

result:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
LogicalProject(first=[$0], score=[$2])
  LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED

Stage 5 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 4 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED

Stage 3 : GroupReduce
content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED

Stage 2 : Map
content : to: Row(f0: String, f1: 
Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 1 : Map
content : Map at 
emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : 
RANDOM_PARTITIONED

Stage 0 : Data Sink
content : 
TextOutputFormat (/tmp/wordcount1) - UTF-8
ship_strategy : Forward
exchange_mode : 
PIPELINED
Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED

Stage 12 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 11 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
ship_strategy : Forward
exchange_mode 

[jira] [Created] (FLINK-5857) Recycle idle containers in time for yarn mode

2017-02-20 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5857:
---

 Summary: Recycle idle containers in time for yarn mode
 Key: FLINK-5857
 URL: https://issues.apache.org/jira/browse/FLINK-5857
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: shuai.xu
Assignee: shuai.xu


When we run flink batch job like map reduce, after a map is finished, the 
container for it may be idle for a long time, we need to have a strategy to 
recycle there container to reduce resource usage



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5836) Race condition between slot offering and task deployment

2017-02-20 Thread Biao Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875234#comment-15875234
 ] 

Biao Liu edited comment on FLINK-5836 at 2/21/17 2:26 AM:
--

[~wenlong.lwl] has already been working on it, reassign to him.


was (Author: sleepy):
[~wenlong.lwl] was already working on this, reassign to him.

> Race condition between slot offering and task deployment
> 
>
> Key: FLINK-5836
> URL: https://issues.apache.org/jira/browse/FLINK-5836
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Wenlong Lyu
>  Labels: flip-6
>
> The Flip-6 code has a race condition when offering slots to a {{JobManager}} 
> which directly deploys tasks to the offered slots. In such a situation it is 
> possible that the deploy call overtakes the acknowledge message for the slot 
> offering. As a result, the slots are not marked yet as active and the 
> deployment will fail.
> I propose to fix this problem by first activating all offered slots before 
> sending the slot offer message to the {{JobManager}}. Consequently, we'll 
> deactivate and free slots which haven't been accepted by the {{JobManager}} 
> once we've received the offering acknowledge message.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5856) Need return redundant containers to yarn for yarn mode

2017-02-20 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5856:
---

 Summary: Need return redundant containers to yarn for yarn mode
 Key: FLINK-5856
 URL: https://issues.apache.org/jira/browse/FLINK-5856
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: shuai.xu
Assignee: shuai.xu


For flink on yarn mode, RM requests container from yarn according to the 
requirement of the JM. But the AMRMClientAsync used in yarn doesn't guarantee 
that the number of containers returned exactly equal to the number requested. 
So it need to record the number request by flink rm and return the redundant 
ones to yarn.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5836) Race condition between slot offering and task deployment

2017-02-20 Thread Biao Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875234#comment-15875234
 ] 

Biao Liu commented on FLINK-5836:
-

[~wenlong.lwl] was already working on this, reassign to him.

> Race condition between slot offering and task deployment
> 
>
> Key: FLINK-5836
> URL: https://issues.apache.org/jira/browse/FLINK-5836
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Biao Liu
>  Labels: flip-6
>
> The Flip-6 code has a race condition when offering slots to a {{JobManager}} 
> which directly deploys tasks to the offered slots. In such a situation it is 
> possible that the deploy call overtakes the acknowledge message for the slot 
> offering. As a result, the slots are not marked yet as active and the 
> deployment will fail.
> I propose to fix this problem by first activating all offered slots before 
> sending the slot offer message to the {{JobManager}}. Consequently, we'll 
> deactivate and free slots which haven't been accepted by the {{JobManager}} 
> once we've received the offering acknowledge message.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5836) Race condition between slot offering and task deployment

2017-02-20 Thread Biao Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu reassigned FLINK-5836:
---

Assignee: Wenlong Lyu  (was: Biao Liu)

> Race condition between slot offering and task deployment
> 
>
> Key: FLINK-5836
> URL: https://issues.apache.org/jira/browse/FLINK-5836
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Wenlong Lyu
>  Labels: flip-6
>
> The Flip-6 code has a race condition when offering slots to a {{JobManager}} 
> which directly deploys tasks to the offered slots. In such a situation it is 
> possible that the deploy call overtakes the acknowledge message for the slot 
> offering. As a result, the slots are not marked yet as active and the 
> deployment will fail.
> I propose to fix this problem by first activating all offered slots before 
> sending the slot offer message to the {{JobManager}}. Consequently, we'll 
> deactivate and free slots which haven't been accepted by the {{JobManager}} 
> once we've received the offering acknowledge message.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5441) Directly allow SQL queries on a Table

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875232#comment-15875232
 ] 

ASF GitHub Bot commented on FLINK-5441:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3107
  
Hi @twalthr , the `env.sql(s"SELECT * FROM $table JOIN $otherTable")` is a 
nice way. But how to handle the table's table name and register it to env ? 


> Directly allow SQL queries on a Table
> -
>
> Key: FLINK-5441
> URL: https://issues.apache.org/jira/browse/FLINK-5441
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> Right now a user has to register a table before it can be used in SQL 
> queries. In order to allow more fluent programming we propose calling SQL 
> directly on a table. An underscore can be used to reference the current table:
> {code}
> myTable.sql("SELECT a, b, c FROM _ WHERE d = 12")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3107: [FLINK-5441] [table] Directly allow SQL queries on a Tabl...

2017-02-20 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3107
  
Hi @twalthr , the `env.sql(s"SELECT * FROM $table JOIN $otherTable")` is a 
nice way. But how to handle the table's table name and register it to env ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5414) Bump up Calcite version to 1.11

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875207#comment-15875207
 ] 

ASF GitHub Bot commented on FLINK-5414:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3338#discussion_r102117236
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -155,15 +155,15 @@ class ExpressionReductionTest extends TableTestBase {
   "DataSetCalc",
   batchTableNode(0),
   term("select",
-"13 AS _c0",
+"CAST(13) AS _c0",
--- End diff --

Is it possible to not changing the default nullability while adopting 
Calcite 1.11? Let me try it out as well.


> Bump up Calcite version to 1.11
> ---
>
> Key: FLINK-5414
> URL: https://issues.apache.org/jira/browse/FLINK-5414
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> The upcoming Calcite release 1.11 has a lot of stability fixes and new 
> features. We should update it for the Table API.
> E.g. we can hopefully merge FLINK-4864



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3338: [FLINK-5414] [table] Bump up Calcite version to 1....

2017-02-20 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3338#discussion_r102117236
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
 ---
@@ -155,15 +155,15 @@ class ExpressionReductionTest extends TableTestBase {
   "DataSetCalc",
   batchTableNode(0),
   term("select",
-"13 AS _c0",
+"CAST(13) AS _c0",
--- End diff --

Is it possible to not changing the default nullability while adopting 
Calcite 1.11? Let me try it out as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4326) Flink start-up scripts should optionally start services on the foreground

2017-02-20 Thread Michi Mutsuzaki (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875143#comment-15875143
 ] 

Michi Mutsuzaki commented on FLINK-4326:


flink-console.sh doesn't define log_setting, and i'm getting a warning like 
this:

{noformat}
% ./bin/flink-console.sh jobmanager --configDir conf --executionMode cluster
Starting jobmanager as a console application on host x1.
log4j:WARN No appenders could be found for logger 
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
{noformat}

maybe it makes sense to provide log4j/logback config files that log to console, 
and define log_setting to point to these config files?

> Flink start-up scripts should optionally start services on the foreground
> -
>
> Key: FLINK-4326
> URL: https://issues.apache.org/jira/browse/FLINK-4326
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>
> This has previously been mentioned in the mailing list, but has not been 
> addressed.  Flink start-up scripts start the job and task managers in the 
> background.  This makes it difficult to integrate Flink with most processes 
> supervisory tools and init systems, including Docker.  One can get around 
> this via hacking the scripts or manually starting the right classes via Java, 
> but it is a brittle solution.
> In addition to starting the daemons in the foreground, the start up scripts 
> should use exec instead of running the commends, so as to avoid forks.  Many 
> supervisory tools assume the PID of the process to be monitored is that of 
> the process it first executes, and fork chains make it difficult for the 
> supervisor to figure out what process to monitor.  Specifically, 
> jobmanager.sh and taskmanager.sh should exec flink-daemon.sh, and 
> flink-daemon.sh should exec java.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4422) Convert all time interval measurements to System.nanoTime()

2017-02-20 Thread Jin Mingjian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875127#comment-15875127
 ] 

Jin Mingjian commented on FLINK-4422:
-

[~StephanEwen] thanks for re-assigning. Your suggestion is great. PRs will come 
soon :)

> Convert all time interval measurements to System.nanoTime()
> ---
>
> Key: FLINK-4422
> URL: https://issues.apache.org/jira/browse/FLINK-4422
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Jin Mingjian
>Priority: Minor
>
> In contrast to {{System.currentTimeMillis()}}, {{System.nanoTime()}} is 
> monotonous. To measure delays and time intervals, {{System.nanoTime()}} is 
> hence reliable, while {{System.currentTimeMillis()}} is not.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5692) Add an Option to Deactivate Kryo Fallback for Serializers

2017-02-20 Thread Jin Mingjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jin Mingjian reassigned FLINK-5692:
---

Assignee: Jin Mingjian

> Add an Option to Deactivate Kryo Fallback for Serializers
> -
>
> Key: FLINK-5692
> URL: https://issues.apache.org/jira/browse/FLINK-5692
> Project: Flink
>  Issue Type: New Feature
>  Components: Type Serialization System
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Jin Mingjian
>  Labels: easyfix, starter
>
> Some users want to avoid that Flink's serializers use Kryo, as it can easily 
> become a hotspot in serialization.
> For those users, it would help if there is a flag to "deactive generic 
> types". Those users could then see where types are used that default to Kryo 
> and change these types (make them PoJos, Value types, or write custom 
> serializers).
> There are two ways to approach that:
>   1. (Simple) Make {{GenericTypeInfo}} threw an exception whenever it would 
> create a Kryo Serializer (when the respective flag is set in the 
> {{ExecutionConfig}})
>   2. Have a static flag on the {{TypeExtractor}} to throw an exception 
> whenever it would create a {{GenericTypeInfo}}. This approach has the 
> downside of introducing some static configuration to the TypeExtractor, but 
> may be more helpful because it throws exceptions in the programs at points 
> where the types are used (not where the serializers are created, which may be 
> much later).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...

2017-02-20 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3302
  
FYI: #3370 is the commit we use internally for this feature. Please feel 
free to take it if it helps implementing this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875061#comment-15875061
 ] 

ASF GitHub Bot commented on FLINK-5710:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3302
  
FYI: #3370 is the commit we use internally for this feature. Please feel 
free to take it if it helps implementing this PR.


> Add ProcTime() function to indicate StreamSQL
> -
>
> Key: FLINK-5710
> URL: https://issues.apache.org/jira/browse/FLINK-5710
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Stefano Bortoli
>Assignee: Stefano Bortoli
>Priority: Minor
>
> procTime() is a parameterless scalar function that just indicates processing 
> time mode



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875060#comment-15875060
 ] 

ASF GitHub Bot commented on FLINK-5710:
---

GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3370

[FLINK-5710] Add ProcTime() function to indicate StreamSQL.

This is the commit we used internally -- There is no unit tests associated 
with this PR. It simply serves as a reference point for #3302.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5710

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3370.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3370


commit 7aaa5008c7b49ce48e01d40dc4a04a6211eaf79b
Author: Haohui Mai 
Date:   2017-02-20T21:13:58Z

[FLINK-5710] Add ProcTime() function to indicate StreamSQL.




> Add ProcTime() function to indicate StreamSQL
> -
>
> Key: FLINK-5710
> URL: https://issues.apache.org/jira/browse/FLINK-5710
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Stefano Bortoli
>Assignee: Stefano Bortoli
>Priority: Minor
>
> procTime() is a parameterless scalar function that just indicates processing 
> time mode



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3370: [FLINK-5710] Add ProcTime() function to indicate S...

2017-02-20 Thread haohui
GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3370

[FLINK-5710] Add ProcTime() function to indicate StreamSQL.

This is the commit we used internally -- There is no unit tests associated 
with this PR. It simply serves as a reference point for #3302.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5710

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3370.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3370


commit 7aaa5008c7b49ce48e01d40dc4a04a6211eaf79b
Author: Haohui Mai 
Date:   2017-02-20T21:13:58Z

[FLINK-5710] Add ProcTime() function to indicate StreamSQL.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-02-20 Thread Ted Yu (JIRA)
Ted Yu created FLINK-5855:
-

 Summary: Unprotected access to pendingFilesPerCheckpoint in 
BucketingSink
 Key: FLINK-5855
 URL: https://issues.apache.org/jira/browse/FLINK-5855
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
{code}
Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-4770) Migrate core options

2017-02-20 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-4770:

Description: 
The core options contain everything that is specific to
  - job
  - cross TaskManager / JobManager

> Migrate core options
> 
>
> Key: FLINK-4770
> URL: https://issues.apache.org/jira/browse/FLINK-4770
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Stephan Ewen
>
> The core options contain everything that is specific to
>   - job
>   - cross TaskManager / JobManager



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-4770) Migrate core options

2017-02-20 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-4770:

Summary: Migrate core options  (was: Migrate Job Execution configuration 
options)

> Migrate core options
> 
>
> Key: FLINK-4770
> URL: https://issues.apache.org/jira/browse/FLINK-4770
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Stephan Ewen
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture

2017-02-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874957#comment-15874957
 ] 

Stephan Ewen commented on FLINK-5851:
-

My preference would be to stay with the {{(Completable)Future}} naming scheme.
For some reason, the term {{Promise}} never resonated with me, especially since 
completing it with an exception seems totally valid, but "breaks" the promise.

> Renaming AsyncCollector into ResultPromise/ResultFuture
> ---
>
> Key: FLINK-5851
> URL: https://issues.apache.org/jira/browse/FLINK-5851
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
> Fix For: 1.3.0
>
>
> Currently, the async I/O API gives an {{AsyncCollector}} to an 
> {{AsyncFunction}} implementation. The name does not really reflect what the 
> {{AsyncCollector}} does since it does not collect but is actually a one time 
> completable future. Therefore, I propose to rename the {{AsyncCollector}} 
> into {{ResultPromise}} or {{ResultFuture}}. This is API changing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5843) Website/docs missing Cache-Control HTTP header, can serve stale data

2017-02-20 Thread Patrick Lucas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Lucas reassigned FLINK-5843:


Assignee: Patrick Lucas
Priority: Minor  (was: Major)

> Website/docs missing Cache-Control HTTP header, can serve stale data
> 
>
> Key: FLINK-5843
> URL: https://issues.apache.org/jira/browse/FLINK-5843
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Patrick Lucas
>Assignee: Patrick Lucas
>Priority: Minor
>
> When Flink 1.2.0 was released, I found that the [Flink downloads 
> page|https://flink.apache.org/downloads.html] was out-of-date until I forced 
> my browser to refresh the page. Upon investigation, I found that the 
> principle pages of the website are served with only the following headers 
> that relate to caching: Date, Last-Modified, and ETag.
> Since there is no Cache-Control header (or the older Expires or Pragma 
> headers), browsers are left to their own heuristics as to how long to cache 
> this content, which varies browser to browser. In some browsers, this 
> heuristic is 10% of the difference between Date and Last-Modified headers. I 
> take this to mean that, if the content were last modified 90 days ago, and I 
> last accessed it 5 days ago, my browser will serve a cached response for a 
> further 3.5 days (10% * (90 days - 5 days) = 8.5 days, 5 days have elapsed 
> leaving 3.5 days).
> I'm not sure who at the ASF we should talk to about this, but I recommend we 
> add the following header to any responses served from the Flink project 
> website or official documentation website\[1]:
> {code}Cache-Control: max-age=0, must-revalidate{code}
> (Note this will only make browser revalidate their caches; if the ETag of the 
> cached content matches what the server still has, the server will return 304 
> Not Modified and omit the actual content)
> \[1] Both the website hosted at flink.apache.org and the documentation hosted 
> at ci.apache.org are affected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874935#comment-15874935
 ] 

ASF GitHub Bot commented on FLINK-5854:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3368
  
It is arguable whether exceptions should ever have a constructor without a 
message, I simply did that here for convenience. I have no strong feelings 
about removing the zero argument constructors.


> Introduce some Flink-specific base Exception types
> --
>
> Key: FLINK-5854
> URL: https://issues.apache.org/jira/browse/FLINK-5854
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Going through the code, there are a lot of places where exception handling 
> could be done a bit nicer, for example 
>   - Some methods do not declare exceptions at all in their signatures. They 
> simply catch all and wrap it in a {{RuntimeException}}.
>   - Some places declare overly generic that they throw {{Exception}}, even 
> though they could very specifically type the exceptions they throw.
> I suggest to introduce two new basic exceptions, that at least help document 
> a bit more what goes wrong:
>   - {{FlinkException}} as a base class for checked exceptions that indicate 
> that something related to using Flink went wrong. Letting a method throw 
> {{FlinkException}} rather than {{Exception}} already helps to not include all 
> of Java's runtime exceptions, which indicate programming errors, rather than 
> situations that should be recovered.
>   - {{FlinkUncheckedException}} as a Flink-specific subclass of 
> {{RuntimeException}}. That one can come in handy in places where no 
> exceptions were declared, for example when reusing an interface that does not 
> declare exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874933#comment-15874933
 ] 

ASF GitHub Bot commented on FLINK-5854:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3368
  
@zentol There are many places in the runtime that declare `throws 
Exception`, for example virtually all of the state handling code. This always 
came from the desire to throw `IOException` plus something that expresses that 
non-I/O stuff related to Flink went wrong. The result was a `throws Exception`, 
which also means that you have to catch `Exception` which you often don't want 
(because this included `RuntimeException` and you typically want runtime 
exception to bubble up a bit further, since they denote bugs by encouraged 
design).

The only place where `throws Exception` really makes sense to me is for 
`MapFunction` and the likes, to allow them to propagate any type of exception 
and let recovery handle them.


> Introduce some Flink-specific base Exception types
> --
>
> Key: FLINK-5854
> URL: https://issues.apache.org/jira/browse/FLINK-5854
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Going through the code, there are a lot of places where exception handling 
> could be done a bit nicer, for example 
>   - Some methods do not declare exceptions at all in their signatures. They 
> simply catch all and wrap it in a {{RuntimeException}}.
>   - Some places declare overly generic that they throw {{Exception}}, even 
> though they could very specifically type the exceptions they throw.
> I suggest to introduce two new basic exceptions, that at least help document 
> a bit more what goes wrong:
>   - {{FlinkException}} as a base class for checked exceptions that indicate 
> that something related to using Flink went wrong. Letting a method throw 
> {{FlinkException}} rather than {{Exception}} already helps to not include all 
> of Java's runtime exceptions, which indicate programming errors, rather than 
> situations that should be recovered.
>   - {{FlinkUncheckedException}} as a Flink-specific subclass of 
> {{RuntimeException}}. That one can come in handy in places where no 
> exceptions were declared, for example when reusing an interface that does not 
> declare exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3368: [FLINK-5854] [core] Add base Flink Exception classes

2017-02-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3368
  
It is arguable whether exceptions should ever have a constructor without a 
message, I simply did that here for convenience. I have no strong feelings 
about removing the zero argument constructors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3368: [FLINK-5854] [core] Add base Flink Exception classes

2017-02-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3368
  
@zentol There are many places in the runtime that declare `throws 
Exception`, for example virtually all of the state handling code. This always 
came from the desire to throw `IOException` plus something that expresses that 
non-I/O stuff related to Flink went wrong. The result was a `throws Exception`, 
which also means that you have to catch `Exception` which you often don't want 
(because this included `RuntimeException` and you typically want runtime 
exception to bubble up a bit further, since they denote bugs by encouraged 
design).

The only place where `throws Exception` really makes sense to me is for 
`MapFunction` and the likes, to allow them to propagate any type of exception 
and let recovery handle them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3368: [FLINK-5854] [core] Add base Flink Exception classes

2017-02-20 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3368
  
Could you name 1 or 2 examples for situations where you think it is 
appropriate to throw a ```FlinkException```? Would invalid arguments (like a 
String being null) be a reason to do so?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874926#comment-15874926
 ] 

ASF GitHub Bot commented on FLINK-5854:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3368
  
Could you name 1 or 2 examples for situations where you think it is 
appropriate to throw a ```FlinkException```? Would invalid arguments (like a 
String being null) be a reason to do so?


> Introduce some Flink-specific base Exception types
> --
>
> Key: FLINK-5854
> URL: https://issues.apache.org/jira/browse/FLINK-5854
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Going through the code, there are a lot of places where exception handling 
> could be done a bit nicer, for example 
>   - Some methods do not declare exceptions at all in their signatures. They 
> simply catch all and wrap it in a {{RuntimeException}}.
>   - Some places declare overly generic that they throw {{Exception}}, even 
> though they could very specifically type the exceptions they throw.
> I suggest to introduce two new basic exceptions, that at least help document 
> a bit more what goes wrong:
>   - {{FlinkException}} as a base class for checked exceptions that indicate 
> that something related to using Flink went wrong. Letting a method throw 
> {{FlinkException}} rather than {{Exception}} already helps to not include all 
> of Java's runtime exceptions, which indicate programming errors, rather than 
> situations that should be recovered.
>   - {{FlinkUncheckedException}} as a Flink-specific subclass of 
> {{RuntimeException}}. That one can come in handy in places where no 
> exceptions were declared, for example when reusing an interface that does not 
> declare exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3368: [FLINK-5854] [core] Add base Flink Exception class...

2017-02-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3368#discussion_r102072659
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * Base class of all Flink-specific unchecked exceptions.
+ */
+@Public
+public class FlinkRuntimeException extends RuntimeException {
+
+   private static final long serialVersionUID = 193141189399279147L;
+
+   /**
+* Creates a new exception with a null message and null cause. 
+*/
+   public FlinkRuntimeException() {
--- End diff --

Is there a reasonable use-case for an exception without an error message or 
cause?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874919#comment-15874919
 ] 

ASF GitHub Bot commented on FLINK-5854:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3368#discussion_r102072659
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * Base class of all Flink-specific unchecked exceptions.
+ */
+@Public
+public class FlinkRuntimeException extends RuntimeException {
+
+   private static final long serialVersionUID = 193141189399279147L;
+
+   /**
+* Creates a new exception with a null message and null cause. 
+*/
+   public FlinkRuntimeException() {
--- End diff --

Is there a reasonable use-case for an exception without an error message or 
cause?


> Introduce some Flink-specific base Exception types
> --
>
> Key: FLINK-5854
> URL: https://issues.apache.org/jira/browse/FLINK-5854
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Going through the code, there are a lot of places where exception handling 
> could be done a bit nicer, for example 
>   - Some methods do not declare exceptions at all in their signatures. They 
> simply catch all and wrap it in a {{RuntimeException}}.
>   - Some places declare overly generic that they throw {{Exception}}, even 
> though they could very specifically type the exceptions they throw.
> I suggest to introduce two new basic exceptions, that at least help document 
> a bit more what goes wrong:
>   - {{FlinkException}} as a base class for checked exceptions that indicate 
> that something related to using Flink went wrong. Letting a method throw 
> {{FlinkException}} rather than {{Exception}} already helps to not include all 
> of Java's runtime exceptions, which indicate programming errors, rather than 
> situations that should be recovered.
>   - {{FlinkUncheckedException}} as a Flink-specific subclass of 
> {{RuntimeException}}. That one can come in handy in places where no 
> exceptions were declared, for example when reusing an interface that does not 
> declare exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3368: [FLINK-5854] [core] Add base Flink Exception class...

2017-02-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3368#discussion_r102072425
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * An exception that is thrown if the dynamic instantiation of code fails.
+ * 
+ * This exception is supposed to "sum up" the zoo of exceptions 
typically thrown around
+ * dynamic code loading and instantiations:
+ * 
+ * {@code
+ * try {
+ * Class.forName(classname).asSubclass(TheType.class).newInstance();
+ * }
+ * catch (ClassNotFoundException | ClassCastException | 
InstantiationException | IllegalAccessException e) {
+ * throw new DynamicCodeLoadingException(e);
+ * }
+ * }
+ */
+@Public
+public class DynamicCodeLoadingException extends FlinkException {
+
+   private static final long serialVersionUID = -25138443817255490L;
+
+   /**
+* Creates a new exception with the given message and cause
+*
+* @param message The exception message
+* @param cause The exception that caused this exception
+*/
+   public DynamicCodeLoadingException(String message, Throwable cause) {
--- End diff --

Would it make sense to make this constructor more explicit in the type of 
exceptions it accepts? (i.e. one constructor each for the exceptions that are 
typically thrown in situations that we want to cover)

It's probably just be bloat, but maybe it would prevent misuse of this 
exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874916#comment-15874916
 ] 

ASF GitHub Bot commented on FLINK-5854:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3368#discussion_r102072425
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * An exception that is thrown if the dynamic instantiation of code fails.
+ * 
+ * This exception is supposed to "sum up" the zoo of exceptions 
typically thrown around
+ * dynamic code loading and instantiations:
+ * 
+ * {@code
+ * try {
+ * Class.forName(classname).asSubclass(TheType.class).newInstance();
+ * }
+ * catch (ClassNotFoundException | ClassCastException | 
InstantiationException | IllegalAccessException e) {
+ * throw new DynamicCodeLoadingException(e);
+ * }
+ * }
+ */
+@Public
+public class DynamicCodeLoadingException extends FlinkException {
+
+   private static final long serialVersionUID = -25138443817255490L;
+
+   /**
+* Creates a new exception with the given message and cause
+*
+* @param message The exception message
+* @param cause The exception that caused this exception
+*/
+   public DynamicCodeLoadingException(String message, Throwable cause) {
--- End diff --

Would it make sense to make this constructor more explicit in the type of 
exceptions it accepts? (i.e. one constructor each for the exceptions that are 
typically thrown in situations that we want to cover)

It's probably just be bloat, but maybe it would prevent misuse of this 
exception.


> Introduce some Flink-specific base Exception types
> --
>
> Key: FLINK-5854
> URL: https://issues.apache.org/jira/browse/FLINK-5854
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Going through the code, there are a lot of places where exception handling 
> could be done a bit nicer, for example 
>   - Some methods do not declare exceptions at all in their signatures. They 
> simply catch all and wrap it in a {{RuntimeException}}.
>   - Some places declare overly generic that they throw {{Exception}}, even 
> though they could very specifically type the exceptions they throw.
> I suggest to introduce two new basic exceptions, that at least help document 
> a bit more what goes wrong:
>   - {{FlinkException}} as a base class for checked exceptions that indicate 
> that something related to using Flink went wrong. Letting a method throw 
> {{FlinkException}} rather than {{Exception}} already helps to not include all 
> of Java's runtime exceptions, which indicate programming errors, rather than 
> situations that should be recovered.
>   - {{FlinkUncheckedException}} as a Flink-specific subclass of 
> {{RuntimeException}}. That one can come in handy in places where no 
> exceptions were declared, for example when reusing an interface that does not 
> declare exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3368: [FLINK-5854] [core] Add base Flink Exception class...

2017-02-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3368#discussion_r102071979
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * An exception that is thrown if the dynamic instantiation of code fails.
+ * 
+ * This exception is supposed to "sum up" the zoo of exceptions 
typically thrown around
+ * dynamic code loading and instantiations:
+ * 
+ * {@code
+ * try {
+ * Class.forName(classname).asSubclass(TheType.class).newInstance();
+ * }
+ * catch (ClassNotFoundException | ClassCastException | 
InstantiationException | IllegalAccessException e) {
+ * throw new DynamicCodeLoadingException(e);
--- End diff --

there is no constructor that matches this line of the javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874911#comment-15874911
 ] 

ASF GitHub Bot commented on FLINK-5854:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3368#discussion_r102071979
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * An exception that is thrown if the dynamic instantiation of code fails.
+ * 
+ * This exception is supposed to "sum up" the zoo of exceptions 
typically thrown around
+ * dynamic code loading and instantiations:
+ * 
+ * {@code
+ * try {
+ * Class.forName(classname).asSubclass(TheType.class).newInstance();
+ * }
+ * catch (ClassNotFoundException | ClassCastException | 
InstantiationException | IllegalAccessException e) {
+ * throw new DynamicCodeLoadingException(e);
--- End diff --

there is no constructor that matches this line of the javadoc.


> Introduce some Flink-specific base Exception types
> --
>
> Key: FLINK-5854
> URL: https://issues.apache.org/jira/browse/FLINK-5854
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Going through the code, there are a lot of places where exception handling 
> could be done a bit nicer, for example 
>   - Some methods do not declare exceptions at all in their signatures. They 
> simply catch all and wrap it in a {{RuntimeException}}.
>   - Some places declare overly generic that they throw {{Exception}}, even 
> though they could very specifically type the exceptions they throw.
> I suggest to introduce two new basic exceptions, that at least help document 
> a bit more what goes wrong:
>   - {{FlinkException}} as a base class for checked exceptions that indicate 
> that something related to using Flink went wrong. Letting a method throw 
> {{FlinkException}} rather than {{Exception}} already helps to not include all 
> of Java's runtime exceptions, which indicate programming errors, rather than 
> situations that should be recovered.
>   - {{FlinkUncheckedException}} as a Flink-specific subclass of 
> {{RuntimeException}}. That one can come in handy in places where no 
> exceptions were declared, for example when reusing an interface that does not 
> declare exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2903: [FLINK-5074] [runtime] add a zookeeper based running job ...

2017-02-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2903
  
I think this looks good, thanks!
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5074) Implement a RunningJobRegistry based on Zookeeper

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874889#comment-15874889
 ] 

ASF GitHub Bot commented on FLINK-5074:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2903
  
I think this looks good, thanks!
Merging this...


> Implement a RunningJobRegistry based on Zookeeper 
> --
>
> Key: FLINK-5074
> URL: https://issues.apache.org/jira/browse/FLINK-5074
> Project: Flink
>  Issue Type: Task
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> For flip-6, it has implemented the ZookeeperHaServices, but 
> ZookeeperHaServices does not support getRunningJobsRegistry. So need to 
> implement a ZK based running job registry.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3369: [FLINK-5831] [webui] order, search and filter metr...

2017-02-20 Thread nellboy
GitHub user nellboy opened a pull request:

https://github.com/apache/flink/pull/3369

[FLINK-5831] [webui] order, search and filter metrics

Metrics are now ordered, and searchable when displaying which metrics 
graphs to display in the web ui.
![screen shot 2017-02-20 at 16 09 
58](https://cloud.githubusercontent.com/assets/39847/23136003/fc02da1a-f79a-11e6-90c0-8da1742d42c1.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nellboy/flink webui/metrics-filtering

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3369.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3369


commit 730e7b6d3b35855d93049e48850ca5919f7db968
Author: paul 
Date:   2017-02-20T09:28:14Z

[FLINK-5831] [webui] sort metrics by id

commit 45f5c565d9a805f78c40ed8ad7cc497aaf5c438f
Author: paul 
Date:   2017-02-20T17:27:13Z

[FLINK-5831] [webui] order, search and filter metrics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5831) Sort metrics in metric selector and add search box

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874853#comment-15874853
 ] 

ASF GitHub Bot commented on FLINK-5831:
---

GitHub user nellboy opened a pull request:

https://github.com/apache/flink/pull/3369

[FLINK-5831] [webui] order, search and filter metrics

Metrics are now ordered, and searchable when displaying which metrics 
graphs to display in the web ui.
![screen shot 2017-02-20 at 16 09 
58](https://cloud.githubusercontent.com/assets/39847/23136003/fc02da1a-f79a-11e6-90c0-8da1742d42c1.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nellboy/flink webui/metrics-filtering

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3369.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3369


commit 730e7b6d3b35855d93049e48850ca5919f7db968
Author: paul 
Date:   2017-02-20T09:28:14Z

[FLINK-5831] [webui] sort metrics by id

commit 45f5c565d9a805f78c40ed8ad7cc497aaf5c438f
Author: paul 
Date:   2017-02-20T17:27:13Z

[FLINK-5831] [webui] order, search and filter metrics




> Sort metrics in metric selector and add search box
> --
>
> Key: FLINK-5831
> URL: https://issues.apache.org/jira/browse/FLINK-5831
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
> Attachments: dropDown.png
>
>
> The JobManager UI makes it hard to select metrics using the drop down menu.
> First of all, it would me nice to sort all entries. Also a search box on top 
> of the drop down would make it much easier to find the metrics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3368: [FLINK-5854] [core] Add base Flink Exception class...

2017-02-20 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/3368

[FLINK-5854] [core] Add base Flink Exception classes

This pull request adds two exception base classes: `FlinkException` and 
`FlinkRuntimeException`.
They are useful in improving the way certain parts of the code handle 
exceptions.

  - `FlinkException` is a base class for checked exceptions that indicate 
that something related to using Flink went wrong. It is helpful, because 
letting a method throw `FlinkException` rather than `Exception` already helps 
to not include all of Java's runtime exceptions, which indicate programming 
errors, rather than situations that should be recovered.
  - `FlinkRuntimeException` as a Flink-specific subclass of 
`RuntimeException` comes in handy in places where no exceptions were declared, 
for example when reusing an interface that does not declare exceptions.

**Important: This does not mean we should just declare `FlinkException` 
everywhere and throw and catch `FlinkException` and `FlinkRuntimeException` 
arbitrarily. Exception handling remains a careful and conscious task.**

This also adds the `DynamicCodeLoadingException` subclass of 
`FlinkException` as an example.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink exceptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3368


commit 1bed2d20a5ccfae4ae7bdfadaaf03fcbe1dba449
Author: Stephan Ewen 
Date:   2017-02-17T15:24:35Z

[FLINK-] [core] Add base Flink Exception classes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874852#comment-15874852
 ] 

ASF GitHub Bot commented on FLINK-5854:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/3368

[FLINK-5854] [core] Add base Flink Exception classes

This pull request adds two exception base classes: `FlinkException` and 
`FlinkRuntimeException`.
They are useful in improving the way certain parts of the code handle 
exceptions.

  - `FlinkException` is a base class for checked exceptions that indicate 
that something related to using Flink went wrong. It is helpful, because 
letting a method throw `FlinkException` rather than `Exception` already helps 
to not include all of Java's runtime exceptions, which indicate programming 
errors, rather than situations that should be recovered.
  - `FlinkRuntimeException` as a Flink-specific subclass of 
`RuntimeException` comes in handy in places where no exceptions were declared, 
for example when reusing an interface that does not declare exceptions.

**Important: This does not mean we should just declare `FlinkException` 
everywhere and throw and catch `FlinkException` and `FlinkRuntimeException` 
arbitrarily. Exception handling remains a careful and conscious task.**

This also adds the `DynamicCodeLoadingException` subclass of 
`FlinkException` as an example.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink exceptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3368


commit 1bed2d20a5ccfae4ae7bdfadaaf03fcbe1dba449
Author: Stephan Ewen 
Date:   2017-02-17T15:24:35Z

[FLINK-] [core] Add base Flink Exception classes




> Introduce some Flink-specific base Exception types
> --
>
> Key: FLINK-5854
> URL: https://issues.apache.org/jira/browse/FLINK-5854
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Going through the code, there are a lot of places where exception handling 
> could be done a bit nicer, for example 
>   - Some methods do not declare exceptions at all in their signatures. They 
> simply catch all and wrap it in a {{RuntimeException}}.
>   - Some places declare overly generic that they throw {{Exception}}, even 
> though they could very specifically type the exceptions they throw.
> I suggest to introduce two new basic exceptions, that at least help document 
> a bit more what goes wrong:
>   - {{FlinkException}} as a base class for checked exceptions that indicate 
> that something related to using Flink went wrong. Letting a method throw 
> {{FlinkException}} rather than {{Exception}} already helps to not include all 
> of Java's runtime exceptions, which indicate programming errors, rather than 
> situations that should be recovered.
>   - {{FlinkUncheckedException}} as a Flink-specific subclass of 
> {{RuntimeException}}. That one can come in handy in places where no 
> exceptions were declared, for example when reusing an interface that does not 
> declare exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5831) Sort metrics in metric selector and add search box

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874850#comment-15874850
 ] 

ASF GitHub Bot commented on FLINK-5831:
---

Github user nellboy closed the pull request at:

https://github.com/apache/flink/pull/3362


> Sort metrics in metric selector and add search box
> --
>
> Key: FLINK-5831
> URL: https://issues.apache.org/jira/browse/FLINK-5831
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
> Attachments: dropDown.png
>
>
> The JobManager UI makes it hard to select metrics using the drop down menu.
> First of all, it would me nice to sort all entries. Also a search box on top 
> of the drop down would make it much easier to find the metrics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3362: [FLINK-5831] [webui] order, search and filter metr...

2017-02-20 Thread nellboy
Github user nellboy closed the pull request at:

https://github.com/apache/flink/pull/3362


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5854) Introduce some Flink-specific base Exception types

2017-02-20 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-5854:
---

 Summary: Introduce some Flink-specific base Exception types
 Key: FLINK-5854
 URL: https://issues.apache.org/jira/browse/FLINK-5854
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.3.0


Going through the code, there are a lot of places where exception handling 
could be done a bit nicer, for example 
  - Some methods do not declare exceptions at all in their signatures. They 
simply catch all and wrap it in a {{RuntimeException}}.
  - Some places declare overly generic that they throw {{Exception}}, even 
though they could very specifically type the exceptions they throw.

I suggest to introduce two new basic exceptions, that at least help document a 
bit more what goes wrong:

  - {{FlinkException}} as a base class for checked exceptions that indicate 
that something related to using Flink went wrong. Letting a method throw 
{{FlinkException}} rather than {{Exception}} already helps to not include all 
of Java's runtime exceptions, which indicate programming errors, rather than 
situations that should be recovered.

  - {{FlinkUncheckedException}} as a Flink-specific subclass of 
{{RuntimeException}}. That one can come in handy in places where no exceptions 
were declared, for example when reusing an interface that does not declare 
exceptions.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5819) Improve metrics reporting

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874840#comment-15874840
 ] 

ASF GitHub Bot commented on FLINK-5819:
---

GitHub user nellboy opened a pull request:

https://github.com/apache/flink/pull/3367

[FLINK-5819] [webui] implements numeric option on metrics graphs

[FLINK-5819] [webui] implements numeric option on metrics graphs
![pasted image at 2017_02_17 
13_56](https://cloud.githubusercontent.com/assets/39847/23135607/937a75da-f799-11e6-9dbb-6e0c877f615b.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nellboy/flink webui/numeric-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3367.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3367


commit 3d05e2dfdf0bef516cfdc0e298e37e2583a8187c
Author: paul 
Date:   2017-02-20T17:20:45Z

[FLINK-5819] [webui] implements numeric option on metrics graphs




> Improve metrics reporting
> -
>
> Key: FLINK-5819
> URL: https://issues.apache.org/jira/browse/FLINK-5819
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.3.0
>Reporter: Paul Nelligan
>  Labels: web-ui
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When displaying individual metrics for a vertex / node of a job in the webui, 
> it is desirable to add an option to display metrics as a numeric or as a 
> chart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3367: [FLINK-5819] [webui] implements numeric option on ...

2017-02-20 Thread nellboy
GitHub user nellboy opened a pull request:

https://github.com/apache/flink/pull/3367

[FLINK-5819] [webui] implements numeric option on metrics graphs

[FLINK-5819] [webui] implements numeric option on metrics graphs
![pasted image at 2017_02_17 
13_56](https://cloud.githubusercontent.com/assets/39847/23135607/937a75da-f799-11e6-9dbb-6e0c877f615b.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nellboy/flink webui/numeric-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3367.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3367


commit 3d05e2dfdf0bef516cfdc0e298e37e2583a8187c
Author: paul 
Date:   2017-02-20T17:20:45Z

[FLINK-5819] [webui] implements numeric option on metrics graphs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5819) Improve metrics reporting

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874838#comment-15874838
 ] 

ASF GitHub Bot commented on FLINK-5819:
---

Github user nellboy closed the pull request at:

https://github.com/apache/flink/pull/3361


> Improve metrics reporting
> -
>
> Key: FLINK-5819
> URL: https://issues.apache.org/jira/browse/FLINK-5819
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.3.0
>Reporter: Paul Nelligan
>  Labels: web-ui
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When displaying individual metrics for a vertex / node of a job in the webui, 
> it is desirable to add an option to display metrics as a numeric or as a 
> chart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3361: [FLINK-5819] [webui] implements numeric option on ...

2017-02-20 Thread nellboy
Github user nellboy closed the pull request at:

https://github.com/apache/flink/pull/3361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3333: Webui/watermarks

2017-02-20 Thread nellboy
Github user nellboy closed the pull request at:

https://github.com/apache/flink/pull/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3366: Webui/watermarks tab

2017-02-20 Thread nellboy
GitHub user nellboy opened a pull request:

https://github.com/apache/flink/pull/3366

Webui/watermarks tab

implement watermarks tab and display low watermarks on graph nodes.
![screen shot 2017-02-20 at 17 54 
51](https://cloud.githubusercontent.com/assets/39847/23135205/fb8419f8-f797-11e6-8ee3-24bd50296a77.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nellboy/flink webui/watermarks-tab

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3366.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3366


commit 25874c6c60889d0e9dd5c491a6612957763f338f
Author: paul 
Date:   2017-02-16T10:25:45Z

[FLINK-3427] [webui] implements watermarks tab

commit 0bc569635b88cd69d72f8b792862c03570dcdced
Author: paul 
Date:   2017-02-16T10:57:26Z

[FLINK-3427] [webui] display watermarks index only instead of full id

commit 90f727ca1535e2d0be5302efe5b8a834d29f901c
Author: paul 
Date:   2017-02-20T09:54:12Z

[FLINK-3427] [webui] replace watermarks parsing in controller with filters

commit 9fb2d4205b1e59ffe5d538fee5092abb0354ee4c
Author: paul 
Date:   2017-02-20T09:58:35Z

[FLINK-3427] [webui] set watermarks to null on node change

commit 93a7a61c8b5852dbc3c86336b247d7e21421b662
Author: paul 
Date:   2017-02-20T17:08:45Z

[FLINK-3427] [webui] display low watermarks on node graph




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5845) CEP: unify key and non-keyed operators

2017-02-20 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-5845:
--
Description: 
Currently the keyed and non-keyed operators in the CEP library have different 
implementations. This issue targets to unify them into one. 

This new implementation will always be applied on a keyed stream, and in the 
case of non-keyed usecases, the input stream will be keyed on a dummy key, as 
done in the case of the {{DataStream.windowAll()}} method, where the input 
stream is keyed using the {{NullByteKeySelector}}.

This is a first step towards making the CEP operators rescalable.

  was:
Currently the keyed and non-keyed operators in the CEP library have different 
implementations. This issue targets to unify them into one. 

This new implementation will always be applied on a keyed stream, and in the 
case of non-keyed usecases, the input stream will be keyed on a dummy keye, as 
done in the case of the {{DataStream.windowAll()}} method, where the input 
stream is keyed using the {{NullByteKeySelector}}.

This is a first step towards making the CEP operators rescalable.


> CEP: unify key and non-keyed operators
> --
>
> Key: FLINK-5845
> URL: https://issues.apache.org/jira/browse/FLINK-5845
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the keyed and non-keyed operators in the CEP library have different 
> implementations. This issue targets to unify them into one. 
> This new implementation will always be applied on a keyed stream, and in the 
> case of non-keyed usecases, the input stream will be keyed on a dummy key, as 
> done in the case of the {{DataStream.windowAll()}} method, where the input 
> stream is keyed using the {{NullByteKeySelector}}.
> This is a first step towards making the CEP operators rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874820#comment-15874820
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2487
  
@haoch What do you think about Robert's suggestion to move this to Bahir? 
Seems like a reasonable first step to me.


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>Assignee: Hao Chen
>  Labels: cep, library, patch-available
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every s1 = inputStream1[id == 2] "
>  + " -> s2 = inputStream2[id == 3] "
>  + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>  + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874818#comment-15874818
 ] 

ASF GitHub Bot commented on FLINK-4499:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2422
  
I think that the increased build times are a blocker for executing this 
with every CI run. Maybe this could become something that we execute nightly 
instead? Not ideal, but better than nothing.



> Introduce findbugs maven plugin
> ---
>
> Key: FLINK-4499
> URL: https://issues.apache.org/jira/browse/FLINK-4499
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Suneel Marthi
>
> As suggested by Stephan in FLINK-4482, this issue is to add 
> findbugs-maven-plugin into the build process so that we can detect lack of 
> proper locking and other defects automatically.
> We can begin with small set of rules.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

2017-02-20 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2487
  
@haoch What do you think about Robert's suggestion to move this to Bahir? 
Seems like a reasonable first step to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >