[jira] [Commented] (FLINK-6588) Rename NumberOfFullRestarts metric

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106833#comment-16106833
 ] 

ASF GitHub Bot commented on FLINK-6588:
---

Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/4292


> Rename NumberOfFullRestarts metric
> --
>
> Key: FLINK-6588
> URL: https://issues.apache.org/jira/browse/FLINK-6588
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Fang Yong
>
> The metric for the number of full restarts is currently called 
> {{fullRestarts}}. For clarity and consitency purposes I propose to rename it 
> to {{numFullRestarts}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4292: [FLINK-6588] Rename NumberOfFullRestarts metric

2017-07-30 Thread zjureel
Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/4292


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-7270) Add support for dynamic properties to cluster entry point

2017-07-30 Thread Fang Yong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong reassigned FLINK-7270:


Assignee: Fang Yong

> Add support for dynamic properties to cluster entry point
> -
>
> Key: FLINK-7270
> URL: https://issues.apache.org/jira/browse/FLINK-7270
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.3.1
>Reporter: Till Rohrmann
>Assignee: Fang Yong
>Priority: Minor
>  Labels: flip-6
>
> We should respect dynamic properties when starting the {{ClusterEntrypoint}}. 
> This basically means extracting them from the passed command line arguments 
> and then adding the to the loaded {{Configuration}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7299) Write GenericRecord using AvroOutputFormat

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106809#comment-16106809
 ] 

ASF GitHub Bot commented on FLINK-7299:
---

GitHub user soniclavier opened a pull request:

https://github.com/apache/flink/pull/4422

[FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat

## What is the purpose of the change

Allow writing Avro GenericRecord using AvroOutputFormat. 

## Brief change log

- Added condition in AvroOutputFormat to check if avroValue is an instance 
of GenericRecord and create a GenericDatumWriter.

## Verifying this change

This change added tests and can be verified as follows:

 -  Added unit tests- testGenericRecord() in AvroOutputFormatTest to write 
GenericRecords.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
  - The serializers: yes
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature?: no (not a major 
feature)
  - If yes, how is the feature documented? not applicable 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soniclavier/flink 
FLINK-7299-GenericRecord-in-AvroOutputFormat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4422


commit 1c71ca43bcd5d4733e581f80637b531ba447e9dc
Author: Vishnu Viswanath 
Date:   2017-07-31T04:58:28Z

[FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat




> Write GenericRecord using AvroOutputFormat
> --
>
> Key: FLINK-7299
> URL: https://issues.apache.org/jira/browse/FLINK-7299
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Vishnu Viswanath
>Assignee: Vishnu Viswanath
>Priority: Minor
>
> Allow AvroOutputFormat to write GenericRecords



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroO...

2017-07-30 Thread soniclavier
GitHub user soniclavier opened a pull request:

https://github.com/apache/flink/pull/4422

[FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat

## What is the purpose of the change

Allow writing Avro GenericRecord using AvroOutputFormat. 

## Brief change log

- Added condition in AvroOutputFormat to check if avroValue is an instance 
of GenericRecord and create a GenericDatumWriter.

## Verifying this change

This change added tests and can be verified as follows:

 -  Added unit tests- testGenericRecord() in AvroOutputFormatTest to write 
GenericRecords.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
  - The serializers: yes
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature?: no (not a major 
feature)
  - If yes, how is the feature documented? not applicable 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soniclavier/flink 
FLINK-7299-GenericRecord-in-AvroOutputFormat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4422


commit 1c71ca43bcd5d4733e581f80637b531ba447e9dc
Author: Vishnu Viswanath 
Date:   2017-07-31T04:58:28Z

[FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7299) Write GenericRecord using AvroOutputFormat

2017-07-30 Thread Vishnu Viswanath (JIRA)
Vishnu Viswanath created FLINK-7299:
---

 Summary: Write GenericRecord using AvroOutputFormat
 Key: FLINK-7299
 URL: https://issues.apache.org/jira/browse/FLINK-7299
 Project: Flink
  Issue Type: Improvement
  Components: Batch Connectors and Input/Output Formats
Reporter: Vishnu Viswanath
Assignee: Vishnu Viswanath
Priority: Minor


Allow AvroOutputFormat to write GenericRecords



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7298) Records can be cleared when all data in state is invalid

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106797#comment-16106797
 ] 

ASF GitHub Bot commented on FLINK-7298:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4421
  
Also, I think we need to remove `!validTimestamp` from `while 
(keyIter.hasNext && !validTimestamp) {` . Because, new records may be inserted 
at the head of rowMapState, in this case expired data will always remain in the 
rowMapState. 


> Records can be cleared when all data in state is invalid
> 
>
> Key: FLINK-7298
> URL: https://issues.apache.org/jira/browse/FLINK-7298
> Project: Flink
>  Issue Type: Improvement
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove 
> records from state when there is no valid records. Instead, we can clear them 
> all at once.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4421: [FLINK-7298] [table] Records can be cleared all at once w...

2017-07-30 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4421
  
Also, I think we need to remove `!validTimestamp` from `while 
(keyIter.hasNext && !validTimestamp) {` . Because, new records may be inserted 
at the head of rowMapState, in this case expired data will always remain in the 
rowMapState. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7298) Records can be cleared when all data in state is invalid

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106794#comment-16106794
 ] 

ASF GitHub Bot commented on FLINK-7298:
---

GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/4421

[FLINK-7298] [table] Records can be cleared all at once when all data in 
state is invalid

In `ProcTimeWindowInnerJoin`.`expireOutTimeRow`, we need not to remove 
records one by one from state when there is no valid records. Instead, we can 
clear them all at once.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 7298

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4421.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4421


commit 0d2641c13fa5e9aafca16c0fbdc87fab4abc05ec
Author: 军长 
Date:   2017-07-31T03:14:06Z

[FLINK-7298] [table] Records can be cleared when all data in state is 
invalid




> Records can be cleared when all data in state is invalid
> 
>
> Key: FLINK-7298
> URL: https://issues.apache.org/jira/browse/FLINK-7298
> Project: Flink
>  Issue Type: Improvement
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove 
> records from state when there is no valid records. Instead, we can clear them 
> all at once.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4421: [FLINK-7298] [table] Records can be cleared all at...

2017-07-30 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/4421

[FLINK-7298] [table] Records can be cleared all at once when all data in 
state is invalid

In `ProcTimeWindowInnerJoin`.`expireOutTimeRow`, we need not to remove 
records one by one from state when there is no valid records. Instead, we can 
clear them all at once.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 7298

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4421.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4421


commit 0d2641c13fa5e9aafca16c0fbdc87fab4abc05ec
Author: 军长 
Date:   2017-07-31T03:14:06Z

[FLINK-7298] [table] Records can be cleared when all data in state is 
invalid




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4415: [FLINK-7269] Refactor passing of dynamic properties

2017-07-30 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4415
  
@tillrohrmann Thank you for your suggestion, I have update the PR template 
and add test case, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7269) Refactor passing of dynamic properties

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106783#comment-16106783
 ] 

ASF GitHub Bot commented on FLINK-7269:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4415
  
@tillrohrmann Thank you for your suggestion, I have update the PR template 
and add test case, thanks


> Refactor passing of dynamic properties
> --
>
> Key: FLINK-7269
> URL: https://issues.apache.org/jira/browse/FLINK-7269
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.3.1
>Reporter: Till Rohrmann
>Assignee: Fang Yong
>
> In order to set dynamic properties when loading the {{Configuration}} via 
> {{GlobalConfiguration.loadConfiguration}}, we currently set a static field in 
> {{GlobalConfiguration}} which is read whenever we load the {{Configuration}}.
> I think this is not a good pattern I propose to remove this functionality. 
> Instead we should explicitly add the dynamic properties to the loaded 
> {{Configuration}} at start of the application.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106770#comment-16106770
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130264936
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
--- End diff --

Should use NFAStateNameHandler.getOriginalNameFromInternal() to compare 
state name.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106772#comment-16106772
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130264164
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
--- End diff --

Should also consider the situation **Proceed to Final state**.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106769#comment-16106769
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130266394
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
+   
resultingComputationStates.addAll(computationStates);
+   } else if 
(previousState == null && 
currentState.getName().equals(skipStrategy.getPatternName())) {
+   throw new 
RuntimeException("infinite loop! Will abort the match process, please rewrite 
your pattern query");
+   }
+   break;
+   case SKIP_TO_LAST:
+   if 
(currentState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
+   
resultingComputationStates.addAll(computationStates);
+   }
+   break;
+   }
break;
}
}
 
-   if (computationState.isStartState()) {
-   int totalBranches = calculateIncreasingSelfState(
-   outgoingEdges.getTotalIgnoreBranches(),
-   outgoingEdges.getTotalTakeBranches());
-
-   DeweyNumber startVersion = 
computationState.getVersion().increase(totalBranches);
-   ComputationState startState = 
ComputationState.createStartState(this, computationState.getState(), 
startVersion);
-   

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106771#comment-16106771
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130265354
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
--- End diff --

Why need the callLevel?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-30 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130264164
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
--- End diff --

Should also consider the situation **Proceed to Final state**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-30 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130266394
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
+   
resultingComputationStates.addAll(computationStates);
+   } else if 
(previousState == null && 
currentState.getName().equals(skipStrategy.getPatternName())) {
+   throw new 
RuntimeException("infinite loop! Will abort the match process, please rewrite 
your pattern query");
+   }
+   break;
+   case SKIP_TO_LAST:
+   if 
(currentState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
+   
resultingComputationStates.addAll(computationStates);
+   }
+   break;
+   }
break;
}
}
 
-   if (computationState.isStartState()) {
-   int totalBranches = calculateIncreasingSelfState(
-   outgoingEdges.getTotalIgnoreBranches(),
-   outgoingEdges.getTotalTakeBranches());
-
-   DeweyNumber startVersion = 
computationState.getVersion().increase(totalBranches);
-   ComputationState startState = 
ComputationState.createStartState(this, computationState.getState(), 
startVersion);
-   resultingComputationStates.add(startState);
+   if (computationState.isStartState() &&
+   skipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT) {
--- End diff --

If 

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-30 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130264936
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
--- End diff --

Should use NFAStateNameHandler.getOriginalNameFromInternal() to compare 
state name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-30 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130265354
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
--- End diff --

Why need the callLevel?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7298) Records can be cleared when all data in state is invalid

2017-07-30 Thread Hequn Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-7298:
---
Description: In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need 
not to remove records from state when there is no valid records. Instead, we 
can clear them all at once.  (was: In 
{{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove records 
from state when there is no valid records. Instead, we can clear 
 them all at once.)

> Records can be cleared when all data in state is invalid
> 
>
> Key: FLINK-7298
> URL: https://issues.apache.org/jira/browse/FLINK-7298
> Project: Flink
>  Issue Type: Improvement
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove 
> records from state when there is no valid records. Instead, we can clear them 
> all at once.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7298) Records can be cleared when all data in state is invalid

2017-07-30 Thread Hequn Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-7298:
--

Assignee: Hequn Cheng

> Records can be cleared when all data in state is invalid
> 
>
> Key: FLINK-7298
> URL: https://issues.apache.org/jira/browse/FLINK-7298
> Project: Flink
>  Issue Type: Improvement
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove 
> records from state when there is no valid records. Instead, we can clear 
>  them all at once.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7298) Records can be cleared when all data in state is invalid

2017-07-30 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-7298:
--

 Summary: Records can be cleared when all data in state is invalid
 Key: FLINK-7298
 URL: https://issues.apache.org/jira/browse/FLINK-7298
 Project: Flink
  Issue Type: Improvement
Reporter: Hequn Cheng


In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove 
records from state when there is no valid records. Instead, we can clear 
 them all at once.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106728#comment-16106728
 ] 

ASF GitHub Bot commented on FLINK-6493:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4328
  
Thanks to @tedyu . It is a good idea and code have been updated based on 
your suggestion. Please check it out again ~ 


> Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
> -
>
> Key: FLINK-6493
> URL: https://issues.apache.org/jira/browse/FLINK-6493
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
> Fix For: 1.4.0
>
>
> {code}
> && ((partitionStateSerializer == null && ((Snapshot) 
> obj).getPartitionStateSerializer() == null)
>   || partitionStateSerializer.equals(((Snapshot) 
> obj).getPartitionStateSerializer()))
> && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot() == null)
>   || partitionStateSerializerConfigSnapshot.equals(((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot()));
> {code}
> The null check for partitionStateSerializer / 
> partitionStateSerializerConfigSnapshot is in combination with another clause.
> This may lead to NPE in the partitionStateSerializer.equals() call.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4328: [FLINK-6493] Fix ineffective null check in RegisteredOper...

2017-07-30 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4328
  
Thanks to @tedyu . It is a good idea and code have been updated based on 
your suggestion. Please check it out again ~ 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7239) GroupWindow and OverWindow aggregates do not check correctly for updating input

2017-07-30 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106724#comment-16106724
 ] 

Hequn Cheng commented on FLINK-7239:


hi, [~fhueske] RetractionRules will make sure that 
{{DataStreamGroupWindowAggregate}} and {{DataStreamOverAggregate}} can not 
receive purely update messages(updates without retraction). If they did, IMO, 
we may need to fix the RetractionRules, not the other way around. 

As for the {{DataStreamWindowJoin}}, we need to set needsUpdatesAsRetraction to 
true considering that it can not handle purely update messages and check that 
the input does not have retraction messages. : )

> GroupWindow and OverWindow aggregates do not check correctly for updating 
> input
> ---
>
> Key: FLINK-7239
> URL: https://issues.apache.org/jira/browse/FLINK-7239
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Fabian Hueske
>
> The streaming table operators for group-window and over-window aggregates do 
> only support append-only input, i.e., no updates.
> However, the {{DataStreamGroupWindowAggregate}} {{DataStreamOverAggregate}} 
> RelNodes do only check that the input does not have retraction messages which 
> is only one possible encoding of updates.
> We should fix the RelNodes to check if the input is append-only using the 
> {{UpdatingPlanChecker}} similar to {{DataStreamWindowJoin}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106672#comment-16106672
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4353
  
BTW, one alternative I was once considering for the scale down case is 
merging state handles that are backed by different physical files in one 
logical state handle, using something based on `MultiStreamStateHandle`. That 
would require minor changes in how the backends currently iterate the handles 
and some calculation of virtual offsets near the `StateAssignmentOperation`, 
mapping the old physical file offsets to the new logical offsets in the stream 
that gives a consecutive logical view over the files. Then, the whole code 
would never again deal with this detail. Wonder if this is worth the effort?


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

2017-07-30 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4353
  
BTW, one alternative I was once considering for the scale down case is 
merging state handles that are backed by different physical files in one 
logical state handle, using something based on `MultiStreamStateHandle`. That 
would require minor changes in how the backends currently iterate the handles 
and some calculation of virtual offsets near the `StateAssignmentOperation`, 
mapping the old physical file offsets to the new logical offsets in the stream 
that gives a consecutive logical view over the files. Then, the whole code 
would never again deal with this detail. Wonder if this is worth the effort?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106669#comment-16106669
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4353
  
I think that idea is problematic because in the rescaling case, all the 
collections can have different sizes. For example there can be 5 managed keyed 
state handles and 7 managed operator state handles and zero state handles for 
the raw state. Then how would you split that up between the 
`OperatorSubtaskStates` in your set? Like this, `OperatorSubtaskState` contains 
the complete state for an operator subtask which I think is a good thing. Also 
maybe at some point there *might* be a reason to report more than one handle 
already on snapshotting.


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

2017-07-30 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4353
  
I think that idea is problematic because in the rescaling case, all the 
collections can have different sizes. For example there can be 5 managed keyed 
state handles and 7 managed operator state handles and zero state handles for 
the raw state. Then how would you split that up between the 
`OperatorSubtaskStates` in your set? Like this, `OperatorSubtaskState` contains 
the complete state for an operator subtask which I think is a good thing. Also 
maybe at some point there *might* be a reason to report more than one handle 
already on snapshotting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-07-30 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4534:
--
Description: 
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():

{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}

  was:
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():
{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}


> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-07-30 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5486:
--
Description: 
Here is related code:
{code}
  
handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);

  synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.clear();
  }
{code}

The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
the synchronization block. Otherwise during the processing of 
handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
cleared.

  was:
Here is related code:

{code}
  
handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);

  synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.clear();
  }
{code}

The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
the synchronization block. Otherwise during the processing of 
handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
cleared.


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.2, 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7115) test instability in MiniClusterITCase.runJobWithMultipleJobManagers

2017-07-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7115.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Closing this issue for the moment because it should have been fixed by 
FLINK-7279. If not, then we have to reopen this issue.

> test instability in MiniClusterITCase.runJobWithMultipleJobManagers
> ---
>
> Key: FLINK-7115
> URL: https://issues.apache.org/jira/browse/FLINK-7115
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> In a test run with unrelated changes, I to have one case of 
> {{MiniClusterITCase}} hanging:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/250775808/log.txt?X-Amz-Expires=30=20170706T151556Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170706/us-east-1/s3/aws4_request=host=5b7c512137c7cbd82dcb77a98aeadc3d761b7055bea6d8f07ad6b786dc196f37
> {code}
> ==
> Maven produced no output for 300 seconds.
> ==
> ==
> The following Java processes are running (JPS)
> ==
> 12852 Jps
> 9166 surefirebooter1705381973603203163.jar
> 4966 Launcher
> ==
> Printing stack trace of Java process 12865
> ==
> 12865: No such process
> ==
> Printing stack trace of Java process 9166
> ==
> 2017-07-06 15:05:52
> Full thread dump OpenJDK 64-Bit Server VM (24.131-b00 mixed mode):
> "Attach Listener" daemon prio=10 tid=0x7fc520b1a000 nid=0x3266 waiting on 
> condition [0x]
>java.lang.Thread.State: RUNNABLE
> "flink-akka.actor.default-dispatcher-9" daemon prio=10 tid=0x7fc520b0e800 
> nid=0x23fd waiting on condition [0x7fc51abcb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x0007a0ca2c78> (a 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
>   at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> "flink-akka.actor.default-dispatcher-8" daemon prio=10 tid=0x7fc520bb9800 
> nid=0x23fc waiting on condition [0x7fc51aaca000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x0007a0ca2c78> (a 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
>   at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> "Flink-MetricRegistry-1" prio=10 tid=0x7fc520ba7800 nid=0x23f9 waiting on 
> condition [0x7fc51a4c4000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x0007a09699c8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1092)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>   at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> "flink-akka.actor.default-dispatcher-7" daemon prio=10 tid=0x7fc520b9d800 
> nid=0x23f7 waiting on condition [0x7fc51a6c6000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  

[GitHub] flink pull request #4416: [FLINK-7279][minicluster] fix a deadlock between T...

2017-07-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4416


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-7279) MiniCluster can deadlock at shut down

2017-07-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7279.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 49acd09ec19b5da3f6e9861d658728ec1306e3d5

> MiniCluster can deadlock at shut down
> -
>
> Key: FLINK-7279
> URL: https://issues.apache.org/jira/browse/FLINK-7279
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The {{MiniCluster}} can deadlock in case if the fatal error handler is called 
> while the {{MiniCluster}} shuts down. The reason is that the shut down 
> happens under a lock which is required by the fatal error handler as well. If 
> now the {{MiniCluster}} tries to shut down the underlying RPC service which 
> waits for all actors to terminate, it will never complete because one actor 
> is still waiting for the lock.
> One solution would be to ignore the fatal error handler calls if the 
> {{MiniCluster}} is shutting down.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/257811319/log.txt



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7279) MiniCluster can deadlock at shut down

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106586#comment-16106586
 ] 

ASF GitHub Bot commented on FLINK-7279:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4416


> MiniCluster can deadlock at shut down
> -
>
> Key: FLINK-7279
> URL: https://issues.apache.org/jira/browse/FLINK-7279
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The {{MiniCluster}} can deadlock in case if the fatal error handler is called 
> while the {{MiniCluster}} shuts down. The reason is that the shut down 
> happens under a lock which is required by the fatal error handler as well. If 
> now the {{MiniCluster}} tries to shut down the underlying RPC service which 
> waits for all actors to terminate, it will never complete because one actor 
> is still waiting for the lock.
> One solution would be to ignore the fatal error handler calls if the 
> {{MiniCluster}} is shutting down.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/257811319/log.txt



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7080) Deploy Yarn cluster with job

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106581#comment-16106581
 ] 

ASF GitHub Bot commented on FLINK-7080:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4284


> Deploy Yarn cluster with job
> 
>
> Key: FLINK-7080
> URL: https://issues.apache.org/jira/browse/FLINK-7080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, YARN
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to start a yarn per-job cluster, we have to start a Yarn application 
> running Flink which includes the job to be executed. One way to do it is to 
> upload the serialized form of the {{JobGraph}} as a file. The Yarn entry 
> point can then read the {{JobGraph}} from this file and start the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7080) Deploy Yarn cluster with job

2017-07-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7080.

   Resolution: Done
Fix Version/s: 1.4.0

Added via a954ea113bc29a4480af579387c6e9b81bd76f85

> Deploy Yarn cluster with job
> 
>
> Key: FLINK-7080
> URL: https://issues.apache.org/jira/browse/FLINK-7080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, YARN
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to start a yarn per-job cluster, we have to start a Yarn application 
> running Flink which includes the job to be executed. One way to do it is to 
> upload the serialized form of the {{JobGraph}} as a file. The Yarn entry 
> point can then read the {{JobGraph}} from this file and start the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4284: [FLINK-7080] [yarn] Add Yarn per job mode deployme...

2017-07-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4284


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7080) Deploy Yarn cluster with job

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106578#comment-16106578
 ] 

ASF GitHub Bot commented on FLINK-7080:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4284
  
Travis passed. Merging this PR.


> Deploy Yarn cluster with job
> 
>
> Key: FLINK-7080
> URL: https://issues.apache.org/jira/browse/FLINK-7080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, YARN
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to start a yarn per-job cluster, we have to start a Yarn application 
> running Flink which includes the job to be executed. One way to do it is to 
> upload the serialized form of the {{JobGraph}} as a file. The Yarn entry 
> point can then read the {{JobGraph}} from this file and start the 
> {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4284: [FLINK-7080] [yarn] Add Yarn per job mode deployment

2017-07-30 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4284
  
Travis passed. Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

2017-07-30 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4380
  
Hi @rtudoran, 

I had the impression that your main motivation was to fill the syntactic 
gaps of SQL on streams and not so much to support certain kinds of use cases. 

The semantics of a SQL query are given by the SQL specification and not up 
for discussion. If you want to support a certain behavior, we need to see how 
this can be expressed in standard-compliant SQL and then think about the 
implementation. The semantics of `ORDER BY rowtime ASC FETCH 10 ROWS ONLY` are 
given and will return the ten *first* (first because they have the lowest 
timestamp) result rows.

However, I think what you are trying to achieve is represented by `ORDER BY 
rowtime DESC FETCH 10 ROWS ONLY`. This query returns the ten *last* (last 
because they have the highest timestamp) rows of the result. Obviously, we need 
retraction to handle this case, because the ten last rows will change over 
time. `ORDER BY rowtime DESC OFFSET 10` is also not very useful, because it 
holds back the 10 last result rows. However, we could support it to fill the 
gap in the syntax and execute the query instead of saying "Sorry, can't do 
that".

So the question now is, how do we proceed with this PR? Do we want to add 
the `ORDER BY rowtime ASC OFFSET / FETCH` functionality as simple additions to 
the existing operators (as I said I think it can be done with very few lines if 
we do some refactoring) to fill the syntactic gap or not?

Regardless of whether we add this or not, we should work on the `ORDER BY 
rowtime DESC OFFEST / FETCH` case.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-07-30 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106399#comment-16106399
 ] 

Xingcan Cui commented on FLINK-7245:


Thanks for the answer and the suggestion, [~fhueske]. I'll keep the focus on it.

> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-07-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106397#comment-16106397
 ] 

Fabian Hueske commented on FLINK-7245:
--

Hi [~xccui], thanks for the questions. I think you got it mostly right. 
I would propose to continue this discussion on a separate JIRA about changing 
the handling of time indicators. 
This JIRA is about holding back watermarks which is not only useful for the 
Table API / SQL.

I will open a JIRA to change the time indicator handling soon.
Thanks, Fabian

> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7179) ProjectableTableSource interface doesn't compatible with BoundedOutOfOrdernessTimestampExtractor

2017-07-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106389#comment-16106389
 ] 

Fabian Hueske commented on FLINK-7179:
--

I think a {{ProjectableTableSource}} knows that is provides a virtual column 
time indicator column. It should be able to identify that the field index 
points to a virtual attribute. 
So, I think it should be possible to handle this case correctly, but I agree 
that this is not easy and requires knowledge of how things work internally.

We are currently discussing to change the handling and representation of time 
indicators. In a nutshell, we are thinking of treating them as regular Row 
fields and not as virtual columns anymore. See this 
[thread|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
 for some background info.
I am not sure what we can do about the situation right now or if it is worth to 
invest a lot of time into it if the time indicator representation is changed 
anyway.

What do you think [~suez1224], [~hpeter]?


> ProjectableTableSource interface doesn't compatible with 
> BoundedOutOfOrdernessTimestampExtractor
> 
>
> Key: FLINK-7179
> URL: https://issues.apache.org/jira/browse/FLINK-7179
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>
> In the implementation of window of stream sql, 
> BoundedOutOfOrdernessTimestampExtractor is designed to extract row time from 
> each row. It assumes the ts field is in the data stream by default. On the 
> other hand, ProjectableTableSource is designed to help projection push down. 
> If there is no row time related field in a query, the extractor can't 
> function well. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7291) Incorrect return value of getArity() in ListTypeInfo and MapTypeInfo

2017-07-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106367#comment-16106367
 ] 

Fabian Hueske commented on FLINK-7291:
--

I think you are right that {{getArity()}} was introduced for composite types 
(not sure why it wasn't added to CompositeType). So, this is probably not an 
issue, because the method is (hopefully) only called for composite types.
However, IIRC the method should return the number of record fields a type has 
(without nesting). All primitive types and array types implement the method by 
returning 1. So, we should go for consistency here, IMO.

The {{getTotalFields()}} method is a bigger issue because it is recursively 
called by composite types on their field types (composite or not) to compute 
the number of nested fields (e.g., in the constructor of PojoTypeInfo or 
TupleTypeInfoBase).

> Incorrect return value of getArity() in ListTypeInfo and MapTypeInfo
> 
>
> Key: FLINK-7291
> URL: https://issues.apache.org/jira/browse/FLINK-7291
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Fabian Hueske
>
> The implementation of {{getArity()}} in {{ListTypeInfo}} and {{MapTypeInfo}} 
> returns 0 but should return 1.
> The implementation of {{MapTypeInfo.getTotalFields()}} is also incorrect and 
> must return 1 instead of 2.
> The JavaDocs of {{TypeInformation.getArity()}} and 
> {{TypeInformation.getTotalFields()}} should be extended as well to avoid 
> future confusion about this method. The method returns the arity for current 
> type. For atomic types, this is 1 for composite types, the number of 
> non-nested fields (or number of fields on the first nesting-level).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-6333) Utilize Bloomfilters in RocksDb

2017-07-30 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095490#comment-16095490
 ] 

Ted Yu edited comment on FLINK-6333 at 7/30/17 7:33 AM:


Looks like rocksdb issue 1964 can be closed .


was (Author: yuzhih...@gmail.com):
Looks like rocksdb issue 1964 can be closed.

> Utilize Bloomfilters in RocksDb
> ---
>
> Key: FLINK-6333
> URL: https://issues.apache.org/jira/browse/FLINK-6333
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>
> Bloom Filters would speed up RocksDb lookups.
> When we upgrade to RocksDb 5.2.1+, we would be able to do:
> {code}
>   new BlockBasedTableConfig()
>   .setBlockCacheSize(blockCacheSize)
>   .setBlockSize(blockSize)
>   .setFilter(new BloomFilter())
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106355#comment-16106355
 ] 

ASF GitHub Bot commented on FLINK-6493:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4328#discussion_r130235370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
 ---
@@ -175,14 +175,21 @@ public boolean equals(Object obj) {
return false;
}
 
+   Snapshot snapshot;
+
+   if (obj instanceof Snapshot) {
+   snapshot = (Snapshot)obj;
+   } else {
--- End diff --

nit: if you place the condition for else first, declaration and assignment 
to snapshot can be done in one line.


> Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
> -
>
> Key: FLINK-6493
> URL: https://issues.apache.org/jira/browse/FLINK-6493
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
> Fix For: 1.4.0
>
>
> {code}
> && ((partitionStateSerializer == null && ((Snapshot) 
> obj).getPartitionStateSerializer() == null)
>   || partitionStateSerializer.equals(((Snapshot) 
> obj).getPartitionStateSerializer()))
> && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot() == null)
>   || partitionStateSerializerConfigSnapshot.equals(((Snapshot) 
> obj).getPartitionStateSerializerConfigSnapshot()));
> {code}
> The null check for partitionStateSerializer / 
> partitionStateSerializerConfigSnapshot is in combination with another clause.
> This may lead to NPE in the partitionStateSerializer.equals() call.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4328: [FLINK-6493] Fix ineffective null check in Registe...

2017-07-30 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4328#discussion_r130235370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
 ---
@@ -175,14 +175,21 @@ public boolean equals(Object obj) {
return false;
}
 
+   Snapshot snapshot;
+
+   if (obj instanceof Snapshot) {
+   snapshot = (Snapshot)obj;
+   } else {
--- End diff --

nit: if you place the condition for else first, declaration and assignment 
to snapshot can be done in one line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6838) RescalingITCase fails in master branch

2017-07-30 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-6838:
--
Description: 
{code}
Tests in error:
  RescalingITCase.testSavepointRescalingInKeyedState[1] » JobExecution Job 
execu...
  RescalingITCase.testSavepointRescalingWithKeyedAndNonPartitionedState[1] » 
JobExecution
{code}
Both failed with similar cause:
{code}

testSavepointRescalingInKeyedState[1](org.apache.flink.test.checkpointing.RescalingITCase)
  Time elapsed: 4.813 sec  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
  at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
  at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
  at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
  at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
java.lang.Exception: Could not materialize checkpoint 4 for operator Flat Map 
-> Sink: Unnamed (1/2).
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:967)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator 
Flat Map -> Sink: Unnamed (1/2).
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:967)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Cannot 
register Closeable, registry is already closed. Closing argument.
  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot register Closeable, registry is already 
closed. Closing argument.
  at 
org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:66)
  at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.openCheckpointStream(RocksDBKeyedStateBackend.java:495)
  at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.openIOHandle(RocksDBKeyedStateBackend.java:394)
  at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.openIOHandle(RocksDBKeyedStateBackend.java:390)
  at 
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:67)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
{code}

  was:
{code}
Tests in error: