[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:11 PM:


I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}, and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.


was (Author: srichter):
I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}, and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:13 PM:


I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed into the 
ExecutionGraph implementations. The web interface needs to be changed to 
extract the information through the interface.


was (Author: srichter):
I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter commented on FLINK-4037:
---

I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}, and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:12 PM:


I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.


was (Author: srichter):
I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}, and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed the 
ExecutionGraphs. The web interface needs to be changed to extract the 
information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982
 ] 

Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:14 PM:


I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted. Maybe 
there is some cleaner way of introducing a real life cycle for ExecutionGraphs?

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed into the 
ExecutionGraph implementations. The web interface needs to be changed to 
extract the information through the interface.


was (Author: srichter):
I think introducing an {{ArchivedExecutionGraph}} to maintain relevant 
information from the {{ExecutionGraph}} of finished jobs is a little more 
involved. E.g. the web interface has to deal with objects from 
{{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished 
and in-flight jobs.

To achieve this, we could for example:
* Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However 
this probably not the best idea as it violates the substitution principle.
* Introduce a common interface for both, {{ExecutionGraph}} and 
{{ArchivedExecutionGraph}}. This interface provides means for the web interface 
to extract information for display.

The method {{prepareForArchiving()}} could convert ExecutionGraphs into 
ArchivedExecutionGraphs. We should ensure that asynchronous request by the web 
interface are routed to a valid ExecutionGraph until the conversion is complete 
and the object that acts as model for the web interface is substituted.

Furthermore, we need to identify all references in ExecutionGraph that could 
hold objects from user-provided classes (e.g. accumulators, metrics, ...), 
stringify their information (see {{JobConfigHandler.handleRequest()}}), and 
release the references so that the user classloader can be garbage-collected. 
Corresponding parts of the mentioned stringification have to be pushed into the 
ExecutionGraph implementations. The web interface needs to be changed to 
extract the information through the interface.

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-16 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4037:
--
Assignee: (was: Stefan Richter)

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4083) Use ClosureCleaner for Join where and equalTo

2016-06-16 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4083:
-

 Summary: Use ClosureCleaner for Join where and equalTo
 Key: FLINK-4083
 URL: https://issues.apache.org/jira/browse/FLINK-4083
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.0.3
Reporter: Stefan Richter
Assignee: Stefan Richter
Priority: Minor


When specifying a key selector in the where or equalTo clause of a Join, the 
closure cleaner is not used. Same problem as FLINK-4078.

{code}
.join(ds)
.where(new KeySelector() {
@Override
public Integer getKey(CustomType value) {
return value.myInt;
}
})
.equalTo(new KeySelector(){

@Override
public Integer getKey(CustomType value) throws Exception {
return value.myInt;
}
});
{code}

The problem is that the KeySelector is an anonymous inner class and as such as 
a reference to the outer object. Normally, this would be rectified by the 
closure cleaner but the cleaner is not used in CoGroup.where().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-4083) Use ClosureCleaner for Join where and equalTo

2016-06-16 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4083:
--
Comment: was deleted

(was: Same problem.)

> Use ClosureCleaner for Join where and equalTo
> -
>
> Key: FLINK-4083
> URL: https://issues.apache.org/jira/browse/FLINK-4083
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> When specifying a key selector in the where or equalTo clause of a Join, the 
> closure cleaner is not used. Same problem as FLINK-4078.
> {code}
> .join(ds)
>   .where(new KeySelector() {
>   @Override
>   public Integer getKey(CustomType value) {
>   return value.myInt;
>   }
>   })
>   .equalTo(new KeySelector(){
>   @Override
>   public Integer getKey(CustomType value) throws Exception {
>   return value.myInt;
>   }
>   });
> {code}
> The problem is that the KeySelector is an anonymous inner class and as such 
> as a reference to the outer object. Normally, this would be rectified by the 
> closure cleaner but the cleaner is not used in CoGroup.where().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4083) Use ClosureCleaner for Join where and equalTo

2016-06-16 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4083:
--
Description: 
When specifying a key selector in the where or equalTo clause of a Join, the 
closure cleaner is not used. Same problem as FLINK-4078.

{code}
.join(ds)
.where(new KeySelector() {
@Override
public Integer getKey(CustomType value) {
return value.myInt;
}
})
.equalTo(new KeySelector(){

@Override
public Integer getKey(CustomType value) throws Exception {
return value.myInt;
}
});
{code}

The problem is that the KeySelector is an anonymous inner class and as such as 
a reference to the outer object. Normally, this would be rectified by the 
closure cleaner but the cleaner is not used in Join.where() and Join.equalTo().

  was:
When specifying a key selector in the where or equalTo clause of a Join, the 
closure cleaner is not used. Same problem as FLINK-4078.

{code}
.join(ds)
.where(new KeySelector() {
@Override
public Integer getKey(CustomType value) {
return value.myInt;
}
})
.equalTo(new KeySelector(){

@Override
public Integer getKey(CustomType value) throws Exception {
return value.myInt;
}
});
{code}

The problem is that the KeySelector is an anonymous inner class and as such as 
a reference to the outer object. Normally, this would be rectified by the 
closure cleaner but the cleaner is not used in CoGroup.where().


> Use ClosureCleaner for Join where and equalTo
> -
>
> Key: FLINK-4083
> URL: https://issues.apache.org/jira/browse/FLINK-4083
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> When specifying a key selector in the where or equalTo clause of a Join, the 
> closure cleaner is not used. Same problem as FLINK-4078.
> {code}
> .join(ds)
>   .where(new KeySelector() {
>   @Override
>   public Integer getKey(CustomType value) {
>   return value.myInt;
>   }
>   })
>   .equalTo(new KeySelector(){
>   @Override
>   public Integer getKey(CustomType value) throws Exception {
>   return value.myInt;
>   }
>   });
> {code}
> The problem is that the KeySelector is an anonymous inner class and as such 
> as a reference to the outer object. Normally, this would be rectified by the 
> closure cleaner but the cleaner is not used in Join.where() and 
> Join.equalTo().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4374) GroupReduce Broken for null Date

2016-08-11 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4374:
-

 Summary: GroupReduce Broken for null Date
 Key: FLINK-4374
 URL: https://issues.apache.org/jira/browse/FLINK-4374
 Project: Flink
  Issue Type: Bug
  Components: DataSet API
Reporter: Stefan Richter


The GroupReduceITCase has an error that allows a problem with {{nul}}l Dates to 
go uncovered:
 If I set the parallelism to 1 in {{testDateNullException()}} and all keys 
actually end up on the same operator, then there is a problem in the 
de/serialization.

It seems that {{null}} values are somehow skipped by the serialization process 
(e.g. maybe no {{null}} indicator is written), which leads to wrong 
deserializations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4374) GroupReduce Broken for null Date

2016-08-11 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417162#comment-15417162
 ] 

Stefan Richter edited comment on FLINK-4374 at 8/11/16 12:52 PM:
-

Yes, it is the date field of the Tuple2 in this test.


was (Author: srichter):
Yes, it is the data field of the Tuple2 in this test.

> GroupReduce Broken for null Date
> 
>
> Key: FLINK-4374
> URL: https://issues.apache.org/jira/browse/FLINK-4374
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Reporter: Stefan Richter
>Assignee: Timo Walther
>
> The GroupReduceITCase has an error that allows a problem with {{null}} Dates 
> to go uncovered:
>  If I set the parallelism to 1 in {{testDateNullException()}} and all keys 
> actually end up on the same operator, then there is a problem in the 
> de/serialization.
> It seems that {{null}} values are somehow skipped by the serialization 
> process (e.g. maybe no {{null}} indicator is written), which leads to wrong 
> deserializations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4374) GroupReduce Broken for null Date

2016-08-11 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417162#comment-15417162
 ] 

Stefan Richter commented on FLINK-4374:
---

Yes, it is the data field of the Tuple2 in this test.

> GroupReduce Broken for null Date
> 
>
> Key: FLINK-4374
> URL: https://issues.apache.org/jira/browse/FLINK-4374
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Reporter: Stefan Richter
>Assignee: Timo Walther
>
> The GroupReduceITCase has an error that allows a problem with {{null}} Dates 
> to go uncovered:
>  If I set the parallelism to 1 in {{testDateNullException()}} and all keys 
> actually end up on the same operator, then there is a problem in the 
> de/serialization.
> It seems that {{null}} values are somehow skipped by the serialization 
> process (e.g. maybe no {{null}} indicator is written), which leads to wrong 
> deserializations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4374) GroupReduce Broken for null Date

2016-08-11 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-4374:
-

Assignee: Timo Walther

> GroupReduce Broken for null Date
> 
>
> Key: FLINK-4374
> URL: https://issues.apache.org/jira/browse/FLINK-4374
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Reporter: Stefan Richter
>Assignee: Timo Walther
>
> The GroupReduceITCase has an error that allows a problem with {{nul}}l Dates 
> to go uncovered:
>  If I set the parallelism to 1 in {{testDateNullException()}} and all keys 
> actually end up on the same operator, then there is a problem in the 
> de/serialization.
> It seems that {{null}} values are somehow skipped by the serialization 
> process (e.g. maybe no {{null}} indicator is written), which leads to wrong 
> deserializations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4374) GroupReduce Broken for null Date

2016-08-11 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4374:
--
Description: 
The GroupReduceITCase has an error that allows a problem with {{null}} Dates to 
go uncovered:
 If I set the parallelism to 1 in {{testDateNullException()}} and all keys 
actually end up on the same operator, then there is a problem in the 
de/serialization.

It seems that {{null}} values are somehow skipped by the serialization process 
(e.g. maybe no {{null}} indicator is written), which leads to wrong 
deserializations.

  was:
The GroupReduceITCase has an error that allows a problem with {{nul}}l Dates to 
go uncovered:
 If I set the parallelism to 1 in {{testDateNullException()}} and all keys 
actually end up on the same operator, then there is a problem in the 
de/serialization.

It seems that {{null}} values are somehow skipped by the serialization process 
(e.g. maybe no {{null}} indicator is written), which leads to wrong 
deserializations.


> GroupReduce Broken for null Date
> 
>
> Key: FLINK-4374
> URL: https://issues.apache.org/jira/browse/FLINK-4374
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Reporter: Stefan Richter
>Assignee: Timo Walther
>
> The GroupReduceITCase has an error that allows a problem with {{null}} Dates 
> to go uncovered:
>  If I set the parallelism to 1 in {{testDateNullException()}} and all keys 
> actually end up on the same operator, then there is a problem in the 
> de/serialization.
> It seems that {{null}} values are somehow skipped by the serialization 
> process (e.g. maybe no {{null}} indicator is written), which leads to wrong 
> deserializations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4200) Kafka consumers should log the offset from which they restore

2016-07-12 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4200:
-

 Summary: Kafka consumers should log the offset from which they 
restore
 Key: FLINK-4200
 URL: https://issues.apache.org/jira/browse/FLINK-4200
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Reporter: Stefan Richter
Assignee: Stefan Richter
Priority: Trivial


Kafka consumers should log the offset from which they restore so that it is 
easier to investigate problems with recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4230) Session Windowing IT Case

2016-07-18 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4230:
-

 Summary: Session Windowing IT Case
 Key: FLINK-4230
 URL: https://issues.apache.org/jira/browse/FLINK-4230
 Project: Flink
  Issue Type: Test
  Components: DataStream API, Local Runtime
Reporter: Stefan Richter
Assignee: Stefan Richter


An ITCase for Session Windows is missing that tests correct behavior under 
several parallel sessions, with timely events, late events within and after the 
lateness interval.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4201) Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted

2016-07-12 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4201:
-

 Summary: Checkpoints for jobs in non-terminal state (e.g. 
suspended) get deleted
 Key: FLINK-4201
 URL: https://issues.apache.org/jira/browse/FLINK-4201
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Stefan Richter
Priority: Blocker


For example, when shutting down a Yarn session, according to the logs 
checkpoints for jobs that did not terminate are deleted. In the shutdown hook, 
removeAllCheckpoints is called and removes checkpoints that should still be 
kept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4150) Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown

2016-07-12 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4150:
--
Description: 
Submitting a job in Yarn with HA can lead to the following exception:

{code}
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0'
 (invalid JAR: zip file is empty)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
{code}

Some job information, including the Blob ids, are stored in Zookeeper. The 
actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set 
to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the 
cluster is shut down, the path for the BlobStore is deleted. When the cluster 
is then restarted, recovering jobs cannot restore because it's Blob ids stored 
in Zookeeper now point to deleted files.

  was:
Submitting a job in Yarn with HA can lead to the following exception:

{code}
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0'
 (invalid JAR: zip file is empty)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
{code}

Some job information, including the Blob ids, are stored in Zookeeper. The 
actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set 
to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the 
cluster is shut down, the path for the BlobStore is deleted. When the cluster 
is then restarted, recovering jobs cannot restore because it's Blob ids stored 
in Zookeeper now point to deleted files.

In particular, this problem frequently occurs for HA in combination with -m 
yarn-cluster. We should discuss in how far this combination actually makes 
sense and what the expected behavior should be.


> Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown
> 
>
> Key: FLINK-4150
> URL: https://issues.apache.org/jira/browse/FLINK-4150
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission
>Reporter: Stefan Richter
>Priority: Blocker
> Fix For: 1.1.0
>
>
> Submitting a job in Yarn with HA can lead to the following exception:
> {code}
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load 
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> ClassLoader info: URL ClassLoader:
> file: 
> '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0'
>  (invalid JAR: zip file is empty)
> Class not resolvable through given classloader.
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Some job information, including the Blob ids, are stored in Zookeeper. The 
> actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set 
> to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the 
> cluster is shut down, the path for the BlobStore is deleted. When the cluster 
> is then restarted, recovering jobs cannot restore because it's Blob ids 
> stored in Zookeeper now point to deleted files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4211) Dynamic Properties not working for jobs submitted to Yarn session

2016-07-13 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4211:
-

 Summary: Dynamic Properties not working for jobs submitted to Yarn 
session
 Key: FLINK-4211
 URL: https://issues.apache.org/jira/browse/FLINK-4211
 Project: Flink
  Issue Type: Bug
  Components: YARN Client
Reporter: Stefan Richter


The command line argument for dynamic properties (-D) is not working when 
submitting jobs to a flink session.


Example:
{code}
bin/flink run -p 4 myJob.jar -D recovery.zookeeper.path.root=/flink/xyz
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4211) Dynamic Properties not working for jobs submitted to Yarn session

2016-07-15 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378999#comment-15378999
 ] 

Stefan Richter commented on FLINK-4211:
---

I see. I think unfortunately this is not really clear the documentation, one 
might be tempted to try and override properties. The Cli accepts those attempts 
without any feedback. Otherwise you can then close the issue as invalid.

> Dynamic Properties not working for jobs submitted to Yarn session
> -
>
> Key: FLINK-4211
> URL: https://issues.apache.org/jira/browse/FLINK-4211
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Stefan Richter
>
> The command line argument for dynamic properties (-D) is not working when 
> submitting jobs to a flink session.
> Example:
> {code}
> bin/flink run -p 4 myJob.jar -D recovery.zookeeper.path.root=/flink/xyz
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-07-08 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4182:
-

 Summary: HA recovery not working properly under ApplicationMaster 
failures.
 Key: FLINK-4182
 URL: https://issues.apache.org/jira/browse/FLINK-4182
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination, State Backends, Checkpointing
Affects Versions: 1.0.3
Reporter: Stefan Richter


When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
not properly recover in HA mode.

There can be different symptoms for this. For example, in one case the job is 
dying with the following exception:

{code}
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot set up the user code libraries: Cannot get library 
with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at 
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
at 
da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
up the user code libraries: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 

[jira] [Updated] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-07-08 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4182:
--
Description: 
When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
not properly recover in HA mode.

There can be different symptoms for this. For example, in one case the job is 
dying with the following exception:

{code}
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot set up the user code libraries: Cannot get library 
with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at 
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
at 
da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
up the user code libraries: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 

[jira] [Created] (FLINK-4166) Generate automatic different namespaces in Zookeeper for Flink applications

2016-07-07 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4166:
-

 Summary: Generate automatic different namespaces in Zookeeper for 
Flink applications
 Key: FLINK-4166
 URL: https://issues.apache.org/jira/browse/FLINK-4166
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.0.3
Reporter: Stefan Richter


We should automatically generate different namespaces per Flink application in 
Zookeeper to avoid interference between different applications that refer to 
the same Zookeeper entries.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4150) Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown

2016-07-04 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4150:
-

 Summary: Problem with Blobstore in Yarn HA setting on recovery 
after cluster shutdown
 Key: FLINK-4150
 URL: https://issues.apache.org/jira/browse/FLINK-4150
 Project: Flink
  Issue Type: Bug
  Components: Job-Submission
Reporter: Stefan Richter


Submitting a job in Yarn with HA can lead to the following exception:

{code}
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0'
 (invalid JAR: zip file is empty)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
{code}

Some job information, including the Blob ids, are stored in Zookeeper. The 
actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set 
to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the 
cluster is shut down, the path for the BlobStore is deleted. When the cluster 
is then restarted, recovering jobs cannot restore because it's Blob ids stored 
in Zookeeper now point to deleted files.

In particular, this problem frequently occurs for HA in combination with -m 
yarn-cluster. We should discuss in how far this combination actually makes 
sense and what the expected behavior should be.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4156) Job with -m yarn-cluster registers TaskManagers to another running Yarn session

2016-07-06 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4156:
-

 Summary: Job with -m yarn-cluster registers TaskManagers to 
another running Yarn session
 Key: FLINK-4156
 URL: https://issues.apache.org/jira/browse/FLINK-4156
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Reporter: Stefan Richter


When a job is started using cluster mode (-m yarn-cluster) and a Yarn session 
is running on the same cluster, the job accidentally registers it's worker 
tasks with the  ongoing Yarn session. This happens because the same Zookeeper 
namespace is used. 

We should consider isolating Flink applications from another by using UUIDS, 
e.g. based on their application ids, in their Zookeeper paths.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3623) Adjust MurmurHash algorithm

2016-08-09 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414110#comment-15414110
 ] 

Stefan Richter commented on FLINK-3623:
---

In my opinion, our implementation of Murmur might be too complicated and 
computationally expensive for simply hashing an int.  Most parts of the used 
algorithm are only required for arbitrary byte[] keys. What matters for mixing 
the bits of an int is the finalizer part of Murmur3, which is:
  
  h ^= h >> 16;
  h *= 0x85ebca6b;
  h ^= h >> 13;
  h *= 0xc2b2ae35;
  h ^= h >> 16;

If I remember correctly, the purpose of the remaining code is to generate the 
intermediate hash from blocks over variable size byte[], whereas the purpose of 
the finalizer is to actually increase entropy and reduce collisions of the 
intermediate hash. Restricting the code to these lines could speed up hashing 
significantly, which might be even more relevant in case hash calculations are 
also used to determine keygroups.

> Adjust MurmurHash algorithm
> ---
>
> Key: FLINK-3623
> URL: https://issues.apache.org/jira/browse/FLINK-3623
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.1.0
>
>
> Flink's MurmurHash implementation differs from the published algorithm.
> From Flink's MathUtils.java:
> {code}
> code *= 0xe6546b64;
> {code}
> The Murmur3_32 algorithm as described by 
> [Wikipedia|https://en.wikipedia.org/wiki/MurmurHash]:
> {code}
> m โ† 5
> n โ† 0xe6546b64
> hash โ† hash ร— m + n
> {code}
> and in Guava's Murmur3_32HashFunction.java:
> {code}
> h1 = h1 * 5 + 0xe6546b64;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4134) EventTimeSessionWindows trigger for empty windows when dropping late events

2016-06-30 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4134:
-

 Summary: EventTimeSessionWindows trigger for empty windows when 
dropping late events
 Key: FLINK-4134
 URL: https://issues.apache.org/jira/browse/FLINK-4134
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.0.3
Reporter: Stefan Richter


It seems like EventTimeSessionWindows sometimes trigger for empty windows. The 
behavior is observed in connection with dropping late events:

{code}
stream
 .keyBy("sessionKey")
 .window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
 .allowedLateness(Time.milliseconds(0))
 .apply(new ValidatingWindowFunction())
 .print();
{code}


I wrote a generator that generates events for several parallel sessions and 
that allows to reproduce the error. For now, I can share this generator 
privately for debugging purposes, but my plan is to use the generator as basis 
for an integration test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4134) EventTimeSessionWindows trigger for empty windows when dropping late events

2016-06-30 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-4134:
-

Assignee: Stefan Richter

> EventTimeSessionWindows trigger for empty windows when dropping late events
> ---
>
> Key: FLINK-4134
> URL: https://issues.apache.org/jira/browse/FLINK-4134
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> It seems like EventTimeSessionWindows sometimes trigger for empty windows. 
> The behavior is observed in connection with dropping late events:
> {code}
> stream
>  .keyBy("sessionKey")
>  .window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
>  .allowedLateness(Time.milliseconds(0))
>  .apply(new ValidatingWindowFunction())
>  .print();
> {code}
> I wrote a generator that generates events for several parallel sessions and 
> that allows to reproduce the error. For now, I can share this generator 
> privately for debugging purposes, but my plan is to use the generator as 
> basis for an integration test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4140) CheckpointCoordinator fails to discard completed checkpoint

2016-07-01 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-4140:
-

Assignee: Stefan Richter

> CheckpointCoordinator fails to discard completed checkpoint
> ---
>
> Key: FLINK-4140
> URL: https://issues.apache.org/jira/browse/FLINK-4140
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Running a job in HA mode I saw the following warning in the job manager logs. 
> The warning appeared after the job was restarted due to a master failure. 
> I've skimmed the code and it looks like the user code class loader is used 
> everywhere when discarding the checkpoint, but something seems to not work as 
> expected (otherwise the warning should not appear).
> {code}
> 2016-07-01 13:08:33,218 WARN  
> org.apache.flink.runtime.checkpoint.SubtaskState  - Failed to 
> discard checkpoint state: StateForTask(Size: 4, Duration: 2012, State: 
> SerializedValue)
> java.lang.ClassNotFoundException: da.testing.State
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:278)
> at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
> at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
> at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
> at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
> at 
> org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at java.util.HashMap.readObject(HashMap.java:1180)
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at 

[jira] [Updated] (FLINK-4140) CheckpointCoordinator fails to discard completed checkpoint

2016-07-01 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4140:
--
Assignee: Ufuk Celebi  (was: Stefan Richter)

> CheckpointCoordinator fails to discard completed checkpoint
> ---
>
> Key: FLINK-4140
> URL: https://issues.apache.org/jira/browse/FLINK-4140
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Assignee: Ufuk Celebi
>
> Running a job in HA mode I saw the following warning in the job manager logs. 
> The warning appeared after the job was restarted due to a master failure. 
> I've skimmed the code and it looks like the user code class loader is used 
> everywhere when discarding the checkpoint, but something seems to not work as 
> expected (otherwise the warning should not appear).
> {code}
> 2016-07-01 13:08:33,218 WARN  
> org.apache.flink.runtime.checkpoint.SubtaskState  - Failed to 
> discard checkpoint state: StateForTask(Size: 4, Duration: 2012, State: 
> SerializedValue)
> java.lang.ClassNotFoundException: da.testing.State
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:278)
> at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
> at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
> at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
> at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
> at 
> org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at java.util.HashMap.readObject(HashMap.java:1180)
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
> at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
> at 

[jira] [Created] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.5.0

2016-07-01 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4142:
-

 Summary: Recovery problem in HA on Hadoop Yarn 2.5.0
 Key: FLINK-4142
 URL: https://issues.apache.org/jira/browse/FLINK-4142
 Project: Flink
  Issue Type: Bug
  Components: YARN Client
Affects Versions: 1.0.3
Reporter: Stefan Richter


On Hadoop Yarn 2.5.0, recovery in HA fails in the following scenario:

1) Kill application master, let it recover normally.
2) After that, kill a task manager.

Now, Yarn tries to restart the killed task manager in an endless loop. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4140) CheckpointCoordinator fails to discard completed checkpoint

2016-07-01 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4140:
-

 Summary: CheckpointCoordinator fails to discard completed 
checkpoint
 Key: FLINK-4140
 URL: https://issues.apache.org/jira/browse/FLINK-4140
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.0.3
Reporter: Stefan Richter


Running a job in HA mode I saw the following warning in the job manager logs. 
The warning appeared after the job was restarted due to a master failure. I've 
skimmed the code and it looks like the user code class loader is used 
everywhere when discarding the checkpoint, but something seems to not work as 
expected (otherwise the warning should not appear).
{code}
2016-07-01 13:08:33,218 WARN  org.apache.flink.runtime.checkpoint.SubtaskState  
- Failed to discard checkpoint state: StateForTask(Size: 4, 
Duration: 2012, State: SerializedValue)
java.lang.ClassNotFoundException: da.testing.State
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
at 
org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at java.util.HashMap.readObject(HashMap.java:1180)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)

[jira] [Created] (FLINK-4141) TaskManager failures not always recover when killed during an ApplicationMaster failure in HA mode on Yarn

2016-07-01 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4141:
-

 Summary: TaskManager failures not always recover when killed 
during an ApplicationMaster failure in HA mode on Yarn
 Key: FLINK-4141
 URL: https://issues.apache.org/jira/browse/FLINK-4141
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.0.3
Reporter: Stefan Richter


High availability on Yarn often fails to recover in the following test scenario:

1. Kill application master process.
2. Then, while application master is recovering, randomly kill several task 
managers (with some delay).

After the application master recovered, not all the killed task manager are 
brought back and no further attempts are made the restart them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.4.1

2016-07-01 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4142:
--
Summary: Recovery problem in HA on Hadoop Yarn 2.4.1  (was: Recovery 
problem in HA on Hadoop Yarn 2.5.0)

> Recovery problem in HA on Hadoop Yarn 2.4.1
> ---
>
> Key: FLINK-4142
> URL: https://issues.apache.org/jira/browse/FLINK-4142
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>
> On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario:
> 1) Kill application master, let it recover normally.
> 2) After that, kill a task manager.
> Now, Yarn tries to restart the killed task manager in an endless loop. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.5.0

2016-07-01 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4142:
--
Description: 
On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario:

1) Kill application master, let it recover normally.
2) After that, kill a task manager.

Now, Yarn tries to restart the killed task manager in an endless loop. 

  was:
On Hadoop Yarn 2.5.0, recovery in HA fails in the following scenario:

1) Kill application master, let it recover normally.
2) After that, kill a task manager.

Now, Yarn tries to restart the killed task manager in an endless loop. 


> Recovery problem in HA on Hadoop Yarn 2.5.0
> ---
>
> Key: FLINK-4142
> URL: https://issues.apache.org/jira/browse/FLINK-4142
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>
> On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario:
> 1) Kill application master, let it recover normally.
> 2) After that, kill a task manager.
> Now, Yarn tries to restart the killed task manager in an endless loop. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.4.1

2016-07-01 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359073#comment-15359073
 ] 

Stefan Richter commented on FLINK-4142:
---

I have a log for the problem here: 
https://storage.googleapis.com/srichter/task_mgr_restart_endless.log

> Recovery problem in HA on Hadoop Yarn 2.4.1
> ---
>
> Key: FLINK-4142
> URL: https://issues.apache.org/jira/browse/FLINK-4142
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>
> On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario:
> 1) Kill application master, let it recover normally.
> 2) After that, kill a task manager.
> Now, Yarn tries to restart the killed task manager in an endless loop. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5730) User can concurrently modify state metadata of RocksDB asynchronous snapshots

2017-02-07 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5730:
-

 Summary: User can concurrently modify state metadata of RocksDB 
asynchronous snapshots
 Key: FLINK-5730
 URL: https://issues.apache.org/jira/browse/FLINK-5730
 Project: Flink
  Issue Type: Bug
Reporter: Stefan Richter


The current implementation of asynchronous snapshots in RocksDB iterates the 
original state metadata structures as part of the asynchronous snapshot. Users 
could potentially modify the state metadata concurrently (e.g. by registering a 
new state while the snapshot is running), thereby corrupting the metadata.

For the way most users are registering their states (at the start of the task), 
this is not a problem. However, if state is conditionally registered at some 
later point in time this can be problematic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5730) Users can concurrently modify state metadata of RocksDB asynchronous snapshots

2017-02-07 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-5730:
--
Summary: Users can concurrently modify state metadata of RocksDB 
asynchronous snapshots  (was: User can concurrently modify state metadata of 
RocksDB asynchronous snapshots)

> Users can concurrently modify state metadata of RocksDB asynchronous snapshots
> --
>
> Key: FLINK-5730
> URL: https://issues.apache.org/jira/browse/FLINK-5730
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>
> The current implementation of asynchronous snapshots in RocksDB iterates the 
> original state metadata structures as part of the asynchronous snapshot. 
> Users could potentially modify the state metadata concurrently (e.g. by 
> registering a new state while the snapshot is running), thereby corrupting 
> the metadata.
> For the way most users are registering their states (at the start of the 
> task), this is not a problem. However, if state is conditionally registered 
> at some later point in time this can be problematic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend

2017-02-06 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5715:
-

 Summary: Asynchronous snapshotting for HeapKeyedStateBackend
 Key: FLINK-5715
 URL: https://issues.apache.org/jira/browse/FLINK-5715
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.3.0
Reporter: Stefan Richter
Assignee: Stefan Richter


Blocking snapshots render the HeapKeyedStateBackend practically unusable for 
many user in productions. Their jobs can not tolerate stopped processing for 
the time it takes to write gigabytes of data from memory to disk. Asynchronous 
snapshots would be a solution to this problem. The challenge for the 
implementation is coming up with a copy-on-write scheme for the in-memory hash 
maps that build the foundation of this backend. After taking a closer look, 
this problem is twofold. First, providing CoW semantics for the hashmap itself, 
as a mutible structure, thereby avoiding costly locking or blocking where 
possible. Second, CoW for the mutable value objects, e.g. through cloning via 
serializers.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5707) Find better keys for backend configuration parameters

2017-02-03 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5707:
-

 Summary: Find better keys for backend configuration parameters
 Key: FLINK-5707
 URL: https://issues.apache.org/jira/browse/FLINK-5707
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.3.0
Reporter: Stefan Richter
Priority: Minor


Currently, some config keys for the backends are confusing or even misleading 
and could be renamed. For example

`state.backend.fs.checkpointdir` -> `state.backend.checkpoints.dir`
`state.backend.rocksdb.checkpointdir` -> `state.backend.rocksdb.workdir`
`state.checkpoints.dir`

This would reflect their purposes much better.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5436) UDF state without CheckpointedRestoring can result in restarting loop

2017-01-23 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834292#comment-15834292
 ] 

Stefan Richter commented on FLINK-5436:
---

I think that would be good to have in.

> UDF state without CheckpointedRestoring can result in restarting loop
> -
>
> Key: FLINK-5436
> URL: https://issues.apache.org/jira/browse/FLINK-5436
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Minor
>
> When restoring a job with Checkpointed state and not implementing the new 
> CheckpointedRestoring interface, the job will be restarted over and over 
> again (given the respective restarting strategy).
> Since this is not recoverable, we should immediately fail the job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5663) Checkpoint fails because of closed registry

2017-01-26 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840070#comment-15840070
 ] 

Stefan Richter commented on FLINK-5663:
---

Then again, question is if this task already ended and for some reason a 
checkpoint was still somewhere in the process of opening a stream. The 
exception itself is not problematic as long as the task was already finished 
for some reason before.

> Checkpoint fails because of closed registry
> ---
>
> Key: FLINK-5663
> URL: https://issues.apache.org/jira/browse/FLINK-5663
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> While testing the 1.2.0 release I got the following Exception:
> {code}
> 2017-01-26 17:29:20,602 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source (3/8) 
> (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED.
> java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom 
> Source (3/8)
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator 
> Source: Custom Source (3/8).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533)
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108)
>   ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 2 for operator 
> Source: Custom Source (3/8).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528)
>   ... 6 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   ... 11 more
> Caused by: java.io.IOException: Could not open output stream for state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305)
>   ... 13 more
> Caused by: java.io.IOException: Cannot register Closeable, registry is 
> already closed. Closing argument.
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63)
>   at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359)
>   ... 15 more
> {code}
> The job recovered and kept running.
> [~stefanrichte...@gmail.com] Can this be a race with the closable registry?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5663) Checkpoint fails because of closed registry

2017-01-26 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840068#comment-15840068
 ] 

Stefan Richter commented on FLINK-5663:
---

I don't think this is a race condition. This exception only happens when the 
registry was already closed which in turn should only happen after the 
enclosing task ended.

> Checkpoint fails because of closed registry
> ---
>
> Key: FLINK-5663
> URL: https://issues.apache.org/jira/browse/FLINK-5663
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> While testing the 1.2.0 release I got the following Exception:
> {code}
> 2017-01-26 17:29:20,602 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source (3/8) 
> (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED.
> java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom 
> Source (3/8)
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator 
> Source: Custom Source (3/8).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533)
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108)
>   ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 2 for operator 
> Source: Custom Source (3/8).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528)
>   ... 6 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   ... 11 more
> Caused by: java.io.IOException: Could not open output stream for state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305)
>   ... 13 more
> Caused by: java.io.IOException: Cannot register Closeable, registry is 
> already closed. Closing argument.
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63)
>   at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359)
>   ... 15 more
> {code}
> The job recovered and kept running.
> [~stefanrichte...@gmail.com] Can this be a race with the closable registry?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5663) Checkpoint fails because of closed registry

2017-01-26 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840101#comment-15840101
 ] 

Stefan Richter commented on FLINK-5663:
---

The only place this registry ever gets closed is through one method, only 
called at the end of a task. So the first thing that is a little less 
transparent from my point of view is the ThreadLocal behaviour. Do you have 
logs for this run? There could be some useful info outputs.

> Checkpoint fails because of closed registry
> ---
>
> Key: FLINK-5663
> URL: https://issues.apache.org/jira/browse/FLINK-5663
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> While testing the 1.2.0 release I got the following Exception:
> {code}
> 2017-01-26 17:29:20,602 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source (3/8) 
> (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED.
> java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom 
> Source (3/8)
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator 
> Source: Custom Source (3/8).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533)
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108)
>   ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 2 for operator 
> Source: Custom Source (3/8).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528)
>   ... 6 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   ... 11 more
> Caused by: java.io.IOException: Could not open output stream for state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305)
>   ... 13 more
> Caused by: java.io.IOException: Cannot register Closeable, registry is 
> already closed. Closing argument.
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63)
>   at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359)
>   ... 15 more
> {code}
> The job recovered and kept running.
> [~stefanrichte...@gmail.com] Can this be a race with the closable registry?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5663) Checkpoint fails because of closed registry

2017-01-26 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-5663:
-

Assignee: Stefan Richter

> Checkpoint fails because of closed registry
> ---
>
> Key: FLINK-5663
> URL: https://issues.apache.org/jira/browse/FLINK-5663
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Stefan Richter
>
> While testing the 1.2.0 release I got the following Exception:
> {code}
> 2017-01-26 17:29:20,602 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source (3/8) 
> (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED.
> java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom 
> Source (3/8)
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator 
> Source: Custom Source (3/8).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533)
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108)
>   ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 2 for operator 
> Source: Custom Source (3/8).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528)
>   ... 6 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   ... 11 more
> Caused by: java.io.IOException: Could not open output stream for state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305)
>   ... 13 more
> Caused by: java.io.IOException: Cannot register Closeable, registry is 
> already closed. Closing argument.
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63)
>   at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359)
>   ... 15 more
> {code}
> The job recovered and kept running.
> [~stefanrichte...@gmail.com] Can this be a race with the closable registry?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5681) Make ReaperThread for SafetyNetCloseableRegistry a singleton

2017-01-27 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5681:
-

 Summary: Make ReaperThread for SafetyNetCloseableRegistry a 
singleton
 Key: FLINK-5681
 URL: https://issues.apache.org/jira/browse/FLINK-5681
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Stefan Richter
Assignee: Stefan Richter


Currently, each {{SafetyNetCloseableRegistry}} spawns an own ReaperThread. 
However, this duty could also be fulfilled by a single ReaperThread that is 
shared by all registries to save unnecessary threads.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] (FLINK-5044) Converting operator and function state from Flink 1.1 for all changed operators in 1.2

2017-01-30 Thread Stefan Richter (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Stefan Richter commented on  FLINK-5044 
 
 
 
 
 
 
 
 
 
 

ย 
 
 
 
 
 
 
  Re: Converting operator and function state from Flink 1.1 for all changed operators in 1.2  
 
 
 
 
 
 
 
 
 
 
No, this is solved through our migration story. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

ย 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (FLINK-5480) User-provided hashes for operators

2017-01-30 Thread Stefan Richter (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Stefan Richter commented on  FLINK-5480 
 
 
 
 
 
 
 
 
 
 

ย 
 
 
 
 
 
 
  Re: User-provided hashes for operators  
 
 
 
 
 
 
 
 
 
 
From my side yes. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

ย 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] [Commented] (FLINK-5740) Make WrappingFunction an interface and move to flink-core

2017-02-09 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15859254#comment-15859254
 ] 

Stefan Richter commented on FLINK-5740:
---

+1 very good idea.

> Make WrappingFunction an interface and move to flink-core
> -
>
> Key: FLINK-5740
> URL: https://issues.apache.org/jira/browse/FLINK-5740
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Making it an interface and having an {{AbstractWrappingFunction}} will allow 
> implementations to have different classes as base classes. Also, we should 
> change {{FunctionUtils}} to work like {{StreamingFunctionUtils}} so that 
> wrapping functions don't have to implement the methods of {{RichFunction}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-1725) New Partitioner for better load balancing for skewed data

2017-02-17 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15871572#comment-15871572
 ] 

Stefan Richter commented on FLINK-1725:
---

I agree with you [~aljoscha]. While this would be nice to have, I think it is 
way more involved than it seems, because this has performance implications and 
several interactions with other Flink features, such as rescaling or queryable 
state.

> New Partitioner for better load balancing for skewed data
> -
>
> Key: FLINK-1725
> URL: https://issues.apache.org/jira/browse/FLINK-1725
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 0.8.1
>Reporter: Anis Nasir
>Assignee: Anis Nasir
>  Labels: LoadBalancing, Partitioner
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Hi,
> We have recently studied the problem of load balancing in Storm [1].
> In particular, we focused on key distribution of the stream for skewed data.
> We developed a new stream partitioning scheme (which we call Partial Key 
> Grouping). It achieves better load balancing than key grouping while being 
> more scalable than shuffle grouping in terms of memory.
> In the paper we show a number of mining algorithms that are easy to implement 
> with partial key grouping, and whose performance can benefit from it. We 
> think that it might also be useful for a larger class of algorithms.
> Partial key grouping is very easy to implement: it requires just a few lines 
> of code in Java when implemented as a custom grouping in Storm [2].
> For all these reasons, we believe it will be a nice addition to the standard 
> Partitioners available in Flink. If the community thinks it's a good idea, we 
> will be happy to offer support in the porting.
> References:
> [1]. 
> https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
> [2]. https://github.com/gdfm/partial-key-grouping



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-3841) RocksDB statebackend creates empty dbs for stateless operators

2017-02-17 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-3841.
-
   Resolution: Not A Problem
Fix Version/s: 1.2.0

Not a problem any more. Will close this because it seems outdated.

> RocksDB statebackend creates empty dbs for stateless operators
> --
>
> Key: FLINK-3841
> URL: https://issues.apache.org/jira/browse/FLINK-3841
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.0
>Reporter: Gyula Fora
>Assignee: MaGuowei
>Priority: Minor
> Fix For: 1.2.0
>
>
> Even though they are not checkpointed there is always an open RocksDB 
> database for all operators if the Rocks backend is set. I wonder if it would 
> make sense to lazy initialize the dbs instead of doing it in the 
> initializeForJob method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2491) Operators are not participating in state checkpointing in some cases

2017-02-17 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15871538#comment-15871538
 ] 

Stefan Richter commented on FLINK-2491:
---

This seems still valid, but I assume it is not in progress anymore. I think the 
reason for the problem is that some operator instances might shutdown at some 
point, but the checkpoint coordinator is still expecting all instances that 
were initially started to confirm in a checkpoint. What we would need is some 
way to unregister operator instances from checkpointing after their shutdown. 
Also this should be reestablished in case of restarts. Is this summary correct 
[~aljoscha] [~StephanEwen]?

> Operators are not participating in state checkpointing in some cases
> 
>
> Key: FLINK-2491
> URL: https://issues.apache.org/jira/browse/FLINK-2491
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Mรกrton Balassi
>Priority: Critical
> Fix For: 1.0.0
>
>
> While implementing a test case for the Kafka Consumer, I came across the 
> following bug:
> Consider the following topology, with the operator parallelism in parentheses:
> Source (2) --> Sink (1).
> In this setup, the {{snapshotState()}} method is called on the source, but 
> not on the Sink.
> The sink receives the generated data.
> only one of the two sources is generating data.
> I've implemented a test case for this, you can find it here: 
> https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5932) Order of legacy vs new state initialization in the AbstractStreamOperator.

2017-03-02 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891930#comment-15891930
 ] 

Stefan Richter commented on FLINK-5932:
---

I think it makes a lot of sense if it is easily possible.

> Order of legacy vs new state initialization in the AbstractStreamOperator.
> --
>
> Key: FLINK-5932
> URL: https://issues.apache.org/jira/browse/FLINK-5932
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently in the 
> {{AbstractStreamOperator::initializeState(OperatorStateHandles 
> stateHandles)}}, the {{restoreStreamCheckpointed}} which is responsible for 
> restoring state from previous Flink versions, (backwards compatibility) is 
> called before the {{initializeState(StateInitializationContext context)}} 
> which is responsible for initializing the state in Flink 1.2.
> This has the negative side effect that when implementing the backwards 
> compatibility strategy for a given operator, we have to restore the old 
> state, store it in local variables, and register it with the new state 
> abstractions in the {{initializeState()}} or the {{open()}}. This creates a 
> lot of unnecessary code in the operators, and potential memory leaks if the 
> local variables are not "null-ified".
> This issue proposes to call the {{restoreStreamCheckpointed}} after the 
> {{initializeState(StateInitializationContext context)}}. This way, the new 
> operator state will have been initialized (e.g. keyed state), and the 
> {{restoreStreamCheckpointed}} will be able to register its state directly 
> with the new abstractions, instead of putting it in local variables and wait 
> for the {{initializeState}} or the {{open()}} to re-register it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5917) Remove MapState.size()

2017-03-02 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891945#comment-15891945
 ] 

Stefan Richter commented on FLINK-5917:
---

I agree with [~aljoscha]. Unlike Java's HashMap, RocksDB does not give any 
feedback if a key already existed and this makes a lot of sense if you consider 
the LSM implementation of RocksDB. RocksDB is offering an estimate on the 
key-count (probably implemented as hyper-log-log), but I think they would 
expose the exact count if this could be easily done. The only workaround that I 
see to maintain a cached count is to perform a lookup before every insert, 
which should be prohibitively expensive.

> Remove MapState.size()
> --
>
> Key: FLINK-5917
> URL: https://issues.apache.org/jira/browse/FLINK-5917
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>
> I'm proposing to remove {{size()}} because it is a prohibitively expensive 
> operation and users might not be aware of it. Instead of {{size()}} users can 
> use an iterator over all mappings to determine the size, when doing this they 
> will be aware of the fact that it is a costly operation.
> Right now, {{size()}} is only costly on the RocksDB state backend but I think 
> with future developments on the in-memory state backend it might also become 
> an expensive operation there.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-4493) Unify the snapshot output format for keyed-state backends

2016-08-25 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4493:
-

 Summary: Unify the snapshot output format for keyed-state backends
 Key: FLINK-4493
 URL: https://issues.apache.org/jira/browse/FLINK-4493
 Project: Flink
  Issue Type: Improvement
Reporter: Stefan Richter
Priority: Minor


We could unify the output format for keyed-state backends implementations, e.g. 
based on RocksDB and Heap, to write a single, common output format.

For example, this would allow us to restore a state that was previously kept in 
RocksDB on a heap-located backend and vice versa.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4492) Cleanup files from canceled snapshots

2016-08-25 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4492:
-

 Summary: Cleanup files from canceled snapshots
 Key: FLINK-4492
 URL: https://issues.apache.org/jira/browse/FLINK-4492
 Project: Flink
  Issue Type: Bug
Reporter: Stefan Richter
Priority: Minor


Current checkpointing only closes CheckpointStateOutputStreams on cancel, but 
incomplete files are not properly deleted from the filesystem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4744) Introduce usercode class loader to deserialize partitionable operator state

2016-10-05 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4744:
-

 Summary: Introduce usercode class loader to deserialize 
partitionable operator state
 Key: FLINK-4744
 URL: https://issues.apache.org/jira/browse/FLINK-4744
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter


Deserialization of partitionable operator state does not work with user code. 
We need to introduce user ClassLoader to the OperatorStateBackend.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4731) HeapKeyedStateBackend restoring broken for scale-in

2016-10-04 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4731:
--
Summary: HeapKeyedStateBackend restoring broken for scale-in  (was: 
HeapKeyedStateBackend restorig broken for scale-in)

> HeapKeyedStateBackend restoring broken for scale-in
> ---
>
> Key: FLINK-4731
> URL: https://issues.apache.org/jira/browse/FLINK-4731
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Restoring the HeapKeyedStateBackend is broken in case that parallelism is 
> reduced. The restore method is overwriting previously restored state.
> We should also add scale-in testing to the RescalingITCase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4730) Introduce CheckpointMetaData

2016-10-04 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4730:
-

 Summary: Introduce CheckpointMetaData
 Key: FLINK-4730
 URL: https://issues.apache.org/jira/browse/FLINK-4730
 Project: Flink
  Issue Type: Bug
Reporter: Stefan Richter


Currently, the meta data for each checkpoint consists of up to 5 long values 
which are passed through several functions. When adding/removing meta data  we 
would have to change function signatures in many places. Furthermore, this is 
prone to errors when the order of arguments is changed accidentally. We should 
introduce a CheckpointMetaData which encapsulates this checkpoint meta data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4731) HeapKeyedStateBackend restorig broken for scale-in

2016-10-04 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4731:
-

 Summary: HeapKeyedStateBackend restorig broken for scale-in
 Key: FLINK-4731
 URL: https://issues.apache.org/jira/browse/FLINK-4731
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter


Restoring the HeapKeyedStateBackend is broken in case that parallelism is 
reduced. The restore method is overwriting previously restored state.

We should also add scale-in testing to the RescalingITCase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4730) Introduce CheckpointMetaData

2016-10-04 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4730:
--
Component/s: State Backends, Checkpointing

> Introduce CheckpointMetaData
> 
>
> Key: FLINK-4730
> URL: https://issues.apache.org/jira/browse/FLINK-4730
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>
> Currently, the meta data for each checkpoint consists of up to 5 long values 
> which are passed through several functions. When adding/removing meta data  
> we would have to change function signatures in many places. Furthermore, this 
> is prone to errors when the order of arguments is changed accidentally. We 
> should introduce a CheckpointMetaData which encapsulates this checkpoint meta 
> data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4730) Introduce CheckpointMetaData

2016-10-04 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-4730:
-

Assignee: Stefan Richter

> Introduce CheckpointMetaData
> 
>
> Key: FLINK-4730
> URL: https://issues.apache.org/jira/browse/FLINK-4730
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Currently, the meta data for each checkpoint consists of up to 5 long values 
> which are passed through several functions. When adding/removing meta data  
> we would have to change function signatures in many places. Furthermore, this 
> is prone to errors when the order of arguments is changed accidentally. We 
> should introduce a CheckpointMetaData which encapsulates this checkpoint meta 
> data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4603) KeyedStateBackend cannot restore user code classes

2016-09-22 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15512553#comment-15512553
 ] 

Stefan Richter commented on FLINK-4603:
---

Currently, we should still keep the UserCodeClassLoader around in the RocksDB 
backend because we still need to serialize the StateDescriptor, which contains 
the TypeSerializer, so that users can not accidentally create StateDescriptors 
with a wrong TypeSerializer. However, we should consider that TypeSerializer 
can be exchanged (ensuring their compatibility), e.g. to allow different 
serialization versions.

> KeyedStateBackend cannot restore user code classes
> --
>
> Key: FLINK-4603
> URL: https://issues.apache.org/jira/browse/FLINK-4603
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.2.0
>
>
> A user reported that he cannot restore keyed state which contains user code 
> classes. I suspect that we don't use the user code class loader to 
> deserialize the state.
> The solution seems to be to forward the user code class loader to the 
> {{KeyedStateBackends}} when restoring state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4556) Make Queryable State Key-Group Aware

2016-09-20 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-4556:
-

Assignee: Stefan Richter

> Make Queryable State Key-Group Aware
> 
>
> Key: FLINK-4556
> URL: https://issues.apache.org/jira/browse/FLINK-4556
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>
> The recent introduction of key-grouped state breaks queryable state because 
> the JobManager does not yet forward the client to the correct TaskManager 
> based on key-group ranges.
> This will either have to be implemented on the JobManager side, i.e. in 
> {{AkkaKvStateLocationLookupService}} or on the {{TaskManager}} when state is 
> registered. The JobManager can know the mapping because it should know the 
> {{parallelism}}/{{maxParallelism}} which it can use to determine where the 
> state for a key-group is stored. The {{TaskManager}} send a 
> {{NotifyKvStateRegistered}} message that already contains a {{keyGroupIndex}} 
> field that is not useful/correct at the moment, though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4508) Consolidate DummyEnvironment and MockEnvironment for Tests

2016-08-26 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4508:
-

 Summary: Consolidate DummyEnvironment and MockEnvironment for Tests
 Key: FLINK-4508
 URL: https://issues.apache.org/jira/browse/FLINK-4508
 Project: Flink
  Issue Type: Test
Reporter: Stefan Richter
Priority: Minor


Currently we {{DummyEnvironment}} and {{MockEnvironment}} as implementations of 
Environments for our test. Both serve a similar purpose, but offer slightly 
different features. We should consolidate this by merging them into one class 
that offers the best of both previous implementations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4942) Improve processing performance of HeapInternalTimerService with key groups

2016-10-27 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4942:
-

 Summary: Improve processing performance of 
HeapInternalTimerService with key groups
 Key: FLINK-4942
 URL: https://issues.apache.org/jira/browse/FLINK-4942
 Project: Flink
  Issue Type: Improvement
Reporter: Stefan Richter


Currently, key groups awareness in `HeapInternalTimerService` is basically 
implemented as (hash) map of (hash) sets. Purpose of this is grouping key 
groups together in a way that allows easy serialization into key groups.

However, this data layout comes along with some significant performance 
decrease, in particular when the number of key groups is high.

I suggest to keep all timers in one set again at runtime, thus being as fast as 
in previous versions without key groups.

Instead, we can perform a very fast online partitioning into key groups before 
a snapshot.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4942) Improve processing performance of HeapInternalTimerService with key groups

2016-10-27 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-4942:
-

Assignee: Stefan Richter

> Improve processing performance of HeapInternalTimerService with key groups
> --
>
> Key: FLINK-4942
> URL: https://issues.apache.org/jira/browse/FLINK-4942
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Currently, key groups awareness in `HeapInternalTimerService` is basically 
> implemented as (hash) map of (hash) sets. Purpose of this is grouping key 
> groups together in a way that allows easy serialization into key groups.
> However, this data layout comes along with some significant performance 
> decrease, in particular when the number of key groups is high.
> I suggest to keep all timers in one set again at runtime, thus being as fast 
> as in previous versions without key groups.
> Instead, we can perform a very fast online partitioning into key groups 
> before a snapshot.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5019) Proper isRestored result for tasks that did not write state

2016-11-09 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-5019.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

> Proper isRestored result for tasks that did not write state
> ---
>
> Key: FLINK-5019
> URL: https://issues.apache.org/jira/browse/FLINK-5019
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> When a subtask is restored from a checkpoint that does not contain any state 
> (e.g. because the subtask did not write state in the previous run), the 
> result of {{StateInitializationContext::isRestored}} will incorrectly return 
> false.
> We should ensure that empty state is somehow reflected in a checkpoint and 
> return true on restore, independent from the presence of state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing

2016-11-09 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650460#comment-15650460
 ] 

Stefan Richter commented on FLINK-5036:
---

>From the description it is not entirely clear if you are discussing a 
>theoretical or a concrete problem that you observed. If there is an observable 
>performance problem, could you please provide some backing measurements (log 
>outputs), the Flink version, and information about your keys and values to 
>help us figure out the cause and extend of the problem?

If we are talking about a theoretical problem, the way you describe the effect 
of key-groups on snapshotting in the {{RocksDBKeyedStateBackend}} is not 
accurate. What was actually changed for key-groups in the current master is 
that each key in RocksDB is now prefixed by it's corresponding key-group ID 
(1-2 byte, depending on maxParallelism). This allows us to iterate all 
key-value pairs in key-grouped order at snapshot time, similar to how we 
previously iterated them in key-order. Furthermore, all key-groups go to the 
same file and are written consecutively without random IOs, and a small index 
of key-group-id -> offset is created. From this point of view, the performance 
impact of key-groups on snapshot time should (hopefully) be marginal.

As another remark, non-partitioned state is snapshotted/restored in a similar 
way. Avoiding key-groups at snapshot times has also more problematic 
implications for restores than you consider in the description. For example, if 
the new parallelism is not a multiple of the old parallelism, we effectively 
have to read and filter the completed keyed state for each task that works on 
it.



> Perform the grouping of keys in restoring instead of checkpointing
> --
>
> Key: FLINK-5036
> URL: https://issues.apache.org/jira/browse/FLINK-5036
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the 
> states will be written onto different files according to their key groups. 
> The procedure is very costly when the states are very big. 
> Given that the snapshot operations will be performed much more frequently 
> than restoring, we can leave the key groups as they are to improve the 
> overall performance. In other words, we can perform the grouping of keys in 
> restoring instead of in checkpointing.
> I think, the implementation will be very similar to the restoring of 
> non-partitioned states. Each task will receive a collection of snapshots each 
> of which contains a set of key groups. Each task will restore its states from 
> the given snapshots by picking values in assigned key groups.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5037) Instability in AbstractUdfStreamOperatorLifecycleTest

2016-11-09 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-5037.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

> Instability in AbstractUdfStreamOperatorLifecycleTest
> -
>
> Key: FLINK-5037
> URL: https://issues.apache.org/jira/browse/FLINK-5037
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> I saw this failure: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/174340963/log.txt
> I suspect it has to do with the {{Thread.sleep()}} in Line 237. I think it 
> can be replaced by {{runStarted.await()}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5041) Implement savepoint backwards compatibility 1.1 -> 1.2

2016-11-09 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5041:
-

 Summary: Implement savepoint backwards compatibility 1.1 -> 1.2
 Key: FLINK-5041
 URL: https://issues.apache.org/jira/browse/FLINK-5041
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stefan Richter
Assignee: Stefan Richter


This issue tracks the implementation of backwards compatibility between Flink 
1.1 and 1.2 releases.

This task subsumes:

- Converting old savepoints to new savepoints, including a conversion of state 
handles to their new replacement.

- Converting keyed state from old backend implementations to their new 
counterparts.

- Converting operator and function state for all changed operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5043) Converting keyed state from Flink 1.1 backend implementations to their new counterparts in 1.2

2016-11-09 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5043:
-

 Summary: Converting keyed state from Flink 1.1 backend 
implementations to their new counterparts in 1.2
 Key: FLINK-5043
 URL: https://issues.apache.org/jira/browse/FLINK-5043
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stefan Richter
Assignee: Stefan Richter


Keyed state backends became keygroup aware in Flink 1.2 and their hierarchy as 
a whole changed significantly. We need to implement a conversion so that old 
snapshots can be restored into new backends.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5042) Convert old savepoints to new savepoints

2016-11-09 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5042:
-

 Summary: Convert old savepoints to new savepoints
 Key: FLINK-5042
 URL: https://issues.apache.org/jira/browse/FLINK-5042
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stefan Richter
Assignee: Stefan Richter


The format of savepoints and the hierarchy of state handles changed a lot 
between Flink 1.1 and 1.2. For backwards compatibility, we need to convert old 
to new savepoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5044) Converting operator and function state from Flink 1.1 for all changed operators in 1.2

2016-11-09 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5044:
-

 Summary: Converting operator and function state from Flink 1.1 for 
all changed operators in 1.2
 Key: FLINK-5044
 URL: https://issues.apache.org/jira/browse/FLINK-5044
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming, Windowing Operators
Affects Versions: 1.2.0
Reporter: Stefan Richter


Snapshot/restore mechanics for operators changed significantly between Flink 
1.1 and Flink 1.2. Furthermore, operators and their hierarchy also changed. We 
need to ensure that old operators can still restore state from their old 
version.

In particular, WindowOperator and KafkaConsumer are currently affected by this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5051) Backwards compatibility for serializers in backend state

2016-11-11 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5051:
-

 Summary: Backwards compatibility for serializers in backend state
 Key: FLINK-5051
 URL: https://issues.apache.org/jira/browse/FLINK-5051
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Stefan Richter


When a new state is register, e.g. in a keyed backend via 
`getPartitionedState`, the caller has to provide all type serializers required 
for the persistence of state components. Explicitly passing the serializers on 
state creation already allows for potentiall version upgrades of serializers.

However, those serializers are currently not part of any snapshot and are only 
provided at runtime, when the state is registered newly or restored. For 
backwards compatibility, this has strong implications: checkpoints are not self 
contained in that state is currently a blackbox without knowledge about it's 
corresponding serializers. Most cases where we would need to restructure the 
state are basically lost. We could only convert them lazily at runtime and only 
once the user is registering the concrete state, which might happen at 
unpredictable points.

I suggest to adapt our solution as follows:

- As now, all states are registered with their set of serializers.
- Unlike now, all serializers are written to the snapshot. This makes 
savepoints self-contained and also allows to create inspection tools for 
savepoints at some point in the future.
- Introduce an interface {{Versioned}} with {{long getVersion()}} and {{boolean 
isCompatible(Versioned v)}} which is then implemented by serializers. 
Compatible serializers must ensure that they can deserialize older versions, 
and can then serialize them in their new format. This is how we upgrade.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5052) Changing the maximum parallelism (number of key groups) of a job

2016-11-11 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5052:
-

 Summary: Changing the maximum parallelism (number of key groups) 
of a job
 Key: FLINK-5052
 URL: https://issues.apache.org/jira/browse/FLINK-5052
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Stefan Richter


Through dynamic rescaling, Flink jobs can already adjust their parallelism and 
each operator only has to read it's assigned key-groups. 

However, the maximum parallelism is determined by the number of key-groups  
(aka maxParallelism), which is currently fixed forever after the job is first 
started. We could consider to relax this limitations, so that users can modify 
the number of key-groups after the fact, which is useful in particular for 
upscaling jobs from older Flink versions (<1.2) which must be converted with 
maxparallelism == parallelism.

In the general case, changing the maxParallelism can lead to shuffling of keys 
between key-groups, which means that a change in the number of key-groups can 
shuffle keys between key-groups and we would have to read the complete state on 
each operator instance, filtering for those keys that actually fall into the 
key-groups assigned to the operator instances. While it is certainly possible 
to support this, it is obviously a very expensive operation.

Fortunately, the assignment of keys to operators is currently determined as 
follows:

{{operatorInstance = computeKeyGroup(key) * parallelism / maxParallelism}}

This means that we can provide more efficient support for upscaling of 
maxParallelism, if {{newMaxParallelism == n * oldMaxParallelism}}. In this 
case, keys are not reshuffled between key-groups, but key-groups are split by a 
factor n instead. This only focus on some old key-groups when restoring 
operator instances for new maxParallelism and significantly reduces the amount 
of unnecessary data transfer, e.g. ~ 1/2 for increasing maxParallelism by a 
factor 2, ~2/3 when increasing by a factor 3, etc. 

Implementing this feature would require the following steps:
- Introduce/modify state handles with the capability to summarize 
multiple logical keygroups into one mixed physical entity.
- Enhance StateAssignmentOperation so that it can deal with and 
correctly assign the new/modified keyed state handles to subtasks on restoring 
a checkpoint. We also need to implement how to compute the correct 
super-key-group, but this is rather simple.
- Filtering out key clippings on restoring in the backends.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2016-11-11 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5053:
-

 Summary: Incremental / lightweight snapshots for checkpoints
 Key: FLINK-5053
 URL: https://issues.apache.org/jira/browse/FLINK-5053
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Stefan Richter


There is currently basically no difference between savepoints and checkpoints 
in Flink and both are created through exactly the same process.

However, savepoints and checkpoints have a slightly different meaning which we 
should take into account to keep Flink efficient:

- Savepoints are (typically infrequently) triggered by the user to create a 
state from which the application can be restarted, e.g. because Flink, some 
code, or the parallelism needs to be changed.

- Checkpoints are (typically frequently) triggered by the System to allow for 
fast recovery in case of failure, but keeping the job/system unchanged.

This means that savepoints and checkpoints can have different properties in 
that:

- Savepoint should represent a state of the application, where characteristics 
of the job (e.g. parallelism) can be adjusted for the next restart. One example 
for things that savepoints need to be aware of are key-groups. Savepoints can 
potentially be a little more expensive than checkpoints, because they are 
usually created a lot less frequently through the user.

- Checkpoints are frequently triggered by the system to allow for fast failure 
recovery. However, failure recovery leaves all characteristics of the job 
unchanged. This checkpoints do not have to be aware of those, e.g. think again 
of key groups. Checkpoints should run faster than creating savepoints, in 
particular it would be nice to have incremental checkpoints.

For a first approach, I would suggest the following steps/changes:

- In checkpoint coordination: differentiate between triggering checkpoints 
and savepoints. Introduce properties for checkpoints that describe their set of 
abilities, e.g. "is-key-group-aware", "is-incremental".

- In state handle infrastructure: introduce state handles that reflect 
incremental checkpoints and drop full key-group awareness, i.e. covering 
folders instead of files and not having keygroup_id -> file/offset mapping, but 
keygroup_range -> folder?

- Backend side: We should start with RocksDB by reintroducing something similar 
to semi-async snapshots, but using 
BackupableDBOptions::setShareTableFiles(true) and transferring only new 
incremental outputs to HDFS. Notice that using RocksDB's internal backup 
mechanism is giving up on the information about individual key-groups. But as 
explained above, this should be totally acceptable for checkpoints, while 
savepoints should use the key-group-aware fully async mode. Of course we also 
need to implement the ability to restore from both types of snapshots.






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4858) Remove Legacy Checkpointing Interfaces

2016-10-19 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4858:
--
Description: 
This issue tracks the removal of the deprecated/legacy interfaces that are 
still in the code and can only be removed once the move to the new Flink 1.2 
checkpointing is completely in place.

So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
methods on the testing harnesses. Furthermore, codepaths and classes touching 
legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can be 
cleaned up. 

  was:
This issue tracks the removal of the deprecated/legacy interfaces that are 
still in the code and can only be removed once the move to the new Flink 1.2 
checkpointing is completely in place.

So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
methods on the testing harnesses.


> Remove Legacy Checkpointing Interfaces
> --
>
> Key: FLINK-4858
> URL: https://issues.apache.org/jira/browse/FLINK-4858
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.2.0
>
>
> This issue tracks the removal of the deprecated/legacy interfaces that are 
> still in the code and can only be removed once the move to the new Flink 1.2 
> checkpointing is completely in place.
> So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot 
> methods on the testing harnesses. Furthermore, codepaths and classes touching 
> legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can 
> be cleaned up. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4492) Cleanup files from canceled snapshots

2016-10-14 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575225#comment-15575225
 ] 

Stefan Richter commented on FLINK-4492:
---

I did not add any cleanup on cancel in the changes I did, but somebody might 
have.

> Cleanup files from canceled snapshots
> -
>
> Key: FLINK-4492
> URL: https://issues.apache.org/jira/browse/FLINK-4492
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Nikolay Vasilishin
>Priority: Minor
>
> Current checkpointing only closes CheckpointStateOutputStreams on cancel, but 
> incomplete files are not properly deleted from the filesystem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592292#comment-15592292
 ] 

Stefan Richter commented on FLINK-4867:
---

If you sort performance is crucial to you, I wrote some inplace radix sort 
algorithm that was extremely fast for me in a precious project. On primitives 
types and serialized byte strings I found it typically factor 2-3x faster than 
JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was 
considering to port it onto Flink, but did not find the time yet. As it is 
radix based and not comparison based, it would require some way to expose 
partial sort keys instead of a compareTo method . If that is interesting to you 
let me know and I can share the original code.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592292#comment-15592292
 ] 

Stefan Richter edited comment on FLINK-4867 at 10/20/16 4:32 PM:
-

If sort performance is crucial to you, I wrote some inplace radix sort 
algorithm that was extremely fast for me in a precious project. On primitives 
types and serialized byte strings I found it typically factor 2-3x faster than 
JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was 
considering to port it onto Flink, but did not find the time yet. As it is 
radix based and not comparison based, it would require some way to expose 
partial sort keys instead of a compareTo method . If that is interesting to you 
let me know and I can share the original code.


was (Author: srichter):
If you sort performance is crucial to you, I wrote some inplace radix sort 
algorithm that was extremely fast for me in a precious project. On primitives 
types and serialized byte strings I found it typically factor 2-3x faster than 
JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was 
considering to port it onto Flink, but did not find the time yet. As it is 
radix based and not comparison based, it would require some way to expose 
partial sort keys instead of a compareTo method . If that is interesting to you 
let me know and I can share the original code.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-10-24 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15601278#comment-15601278
 ] 

Stefan Richter commented on FLINK-4883:
---

Ok, thanks! I will assign you to this.

> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4843) Introduce Test for FsCheckpointStateOutputStream::getPos

2016-10-17 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4843:
-

 Summary: Introduce Test for FsCheckpointStateOutputStream::getPos
 Key: FLINK-4843
 URL: https://issues.apache.org/jira/browse/FLINK-4843
 Project: Flink
  Issue Type: Test
Reporter: Stefan Richter
Assignee: Stefan Richter


Introduce a test for FsCheckpointStateOutputStream::getPos, which is currently 
not included in the tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4842) Introduce test to enforce order of operator / udf lifecycles

2016-10-17 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4842:
-

 Summary: Introduce test to enforce order of operator / udf 
lifecycles 
 Key: FLINK-4842
 URL: https://issues.apache.org/jira/browse/FLINK-4842
 Project: Flink
  Issue Type: Test
Reporter: Stefan Richter
Assignee: Stefan Richter


We should introduce a test that enforces a certain order in which life cycle 
methods of operators and udfs are called, so that they are not easily changed 
by accident.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-10-25 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15604566#comment-15604566
 ] 

Stefan Richter commented on FLINK-4883:
---

Sure, here is the example from the mailing list:

{code}
import org.apache.flink.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {

  val buffer = ArrayBuffer.empty[Long]

  @transient var state: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {
super.open(parameters)
state = getRuntimeContext.getState(new 
ValueStateDescriptor[String]("state-descriptor", classOf[String], ""))
  }

  override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
state.update(value)
  }

  override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
buffer += value

if (state.value() != "") {
  for (elem โ† buffer) {
out.collect((elem, state.value()))
  }

  buffer.clear()
}
  }
}

object StreamingPipeline {

  def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(new MemoryStateBackend())

val pipeline1 = env.generateSequence(0, 1000)

val pipeline2 = env.fromElements("even", "odd")

pipeline1.connect(pipeline2)
  .keyBy(
elem โ‡’ elem % 2 == 0,
elem โ‡’ elem == "even"
  ).flatMap(FlatMapper)
  .print()

env.execute("Example")
  }
}
{code}

> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Renkai Ge
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4910) Introduce safety net for closing file system streams

2016-10-25 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-4910:
-

Assignee: Stefan Richter

> Introduce safety net for closing file system streams
> 
>
> Key: FLINK-4910
> URL: https://issues.apache.org/jira/browse/FLINK-4910
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Streams that are opened through {{FileSystem}} must be closed at the end of 
> their life cycle. However, we found hints that some code forgets to close 
> such streams.
> We should introduce i) a mechanism that closes leaking unclosed streams after 
> usage and ii) provides logging that helps us to track down and fi the sources 
> of such leaks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4910) Introduce safety net for closing file system streams

2016-10-25 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4910:
-

 Summary: Introduce safety net for closing file system streams
 Key: FLINK-4910
 URL: https://issues.apache.org/jira/browse/FLINK-4910
 Project: Flink
  Issue Type: Improvement
Reporter: Stefan Richter


Streams that are opened through {{FileSystem}} must be closed at the end of 
their life cycle. However, we found hints that some code forgets to close such 
streams.

We should introduce i) a mechanism that closes leaking unclosed streams after 
usage and ii) provides logging that helps us to track down and fi the sources 
of such leaks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-10-24 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4883:
--
Assignee: Renkai Ge

> Prevent UDFs implementations through Scala singleton objects
> 
>
> Key: FLINK-4883
> URL: https://issues.apache.org/jira/browse/FLINK-4883
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Renkai Ge
>
> Currently, user can create and use UDFs in Scala like this:
> {code}
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> ...
> }
> {code}
> However, this leads to problems as the UDF is now a singleton that Flink 
> could use across several operator instances, which leads to job failures. We 
> should detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592866#comment-15592866
 ] 

Stefan Richter commented on FLINK-4867:
---

I have already pushed my code into https://github.com/StefanRRichter/RadixSort 
so that you can take a look. I will do some cleanup, more documentation, and 
tests if I find some more time to polish this. If you have questions about the 
code, just drop me an email.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4842) Introduce test to enforce order of operator / udf lifecycles

2016-10-20 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-4842.
-
   Resolution: Implemented
Fix Version/s: 1.2.0

> Introduce test to enforce order of operator / udf lifecycles 
> -
>
> Key: FLINK-4842
> URL: https://issues.apache.org/jira/browse/FLINK-4842
> Project: Flink
>  Issue Type: Test
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> We should introduce a test that enforces a certain order in which life cycle 
> methods of operators and udfs are called, so that they are not easily changed 
> by accident.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-20 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-4844.
-
   Resolution: Implemented
Fix Version/s: 1.2.0

> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects

2016-10-21 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4883:
-

 Summary: Prevent UDFs implementations through Scala singleton 
objects
 Key: FLINK-4883
 URL: https://issues.apache.org/jira/browse/FLINK-4883
 Project: Flink
  Issue Type: Bug
Reporter: Stefan Richter


Currently, user can create and use UDFs in Scala like this:

{code}
object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {
...
}
{code}

However, this leads to problems as the UDF is now a singleton that Flink could 
use across several operator instances, which leads to job failures. We should 
detect and prevent the usage of singleton UDFs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2016-11-14 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15663198#comment-15663198
 ] 

Stefan Richter commented on FLINK-5053:
---

That might very well be, I am still planning to take closer look at RocksDB's 
backup/checkpoint features anyways before I start working on this. Until now, 
this description is more like a rough outline of my planning and for 
discussion. But thanks for the hint!

> Incremental / lightweight snapshots for checkpoints
> ---
>
> Key: FLINK-5053
> URL: https://issues.apache.org/jira/browse/FLINK-5053
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>
> There is currently basically no difference between savepoints and checkpoints 
> in Flink and both are created through exactly the same process.
> However, savepoints and checkpoints have a slightly different meaning which 
> we should take into account to keep Flink efficient:
> - Savepoints are (typically infrequently) triggered by the user to create a 
> state from which the application can be restarted, e.g. because Flink, some 
> code, or the parallelism needs to be changed.
> - Checkpoints are (typically frequently) triggered by the System to allow for 
> fast recovery in case of failure, but keeping the job/system unchanged.
> This means that savepoints and checkpoints can have different properties in 
> that:
> - Savepoint should represent a state of the application, where 
> characteristics of the job (e.g. parallelism) can be adjusted for the next 
> restart. One example for things that savepoints need to be aware of are 
> key-groups. Savepoints can potentially be a little more expensive than 
> checkpoints, because they are usually created a lot less frequently through 
> the user.
> - Checkpoints are frequently triggered by the system to allow for fast 
> failure recovery. However, failure recovery leaves all characteristics of the 
> job unchanged. This checkpoints do not have to be aware of those, e.g. think 
> again of key groups. Checkpoints should run faster than creating savepoints, 
> in particular it would be nice to have incremental checkpoints.
> For a first approach, I would suggest the following steps/changes:
> - In checkpoint coordination: differentiate between triggering checkpoints 
> and savepoints. Introduce properties for checkpoints that describe their set 
> of abilities, e.g. "is-key-group-aware", "is-incremental".
> - In state handle infrastructure: introduce state handles that reflect 
> incremental checkpoints and drop full key-group awareness, i.e. covering 
> folders instead of files and not having keygroup_id -> file/offset mapping, 
> but keygroup_range -> folder?
> - Backend side: We should start with RocksDB by reintroducing something 
> similar to semi-async snapshots, but using 
> BackupableDBOptions::setShareTableFiles(true) and transferring only new 
> incremental outputs to HDFS. Notice that using RocksDB's internal backup 
> mechanism is giving up on the information about individual key-groups. But as 
> explained above, this should be totally acceptable for checkpoints, while 
> savepoints should use the key-group-aware fully async mode. Of course we also 
> need to implement the ability to restore from both types of snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5146) Improved resource cleanup in RocksDB keyed state backend

2016-11-23 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-5146:
--
Priority: Blocker  (was: Major)

> Improved resource cleanup in RocksDB keyed state backend
> 
>
> Key: FLINK-5146
> URL: https://issues.apache.org/jira/browse/FLINK-5146
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
>
> Currently, the resources such as taken snapshots or iterators are not always 
> cleaned up in the RocksDB state backend. In particular, not starting the 
> runnable future will leave taken snapshots unreleased.
> We should improve the releases of all resources allocated through the RocksDB 
> JNI bridge.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5146) Improved resource cleanup in RocksDB keyed state backend

2016-11-23 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5146:
-

 Summary: Improved resource cleanup in RocksDB keyed state backend
 Key: FLINK-5146
 URL: https://issues.apache.org/jira/browse/FLINK-5146
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Stefan Richter


Currently, the resources such as taken snapshots or iterators are not always 
cleaned up in the RocksDB state backend. In particular, not starting the 
runnable future will leave taken snapshots unreleased.

We should improve the releases of all resources allocated through the RocksDB 
JNI bridge.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts

2016-11-24 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692874#comment-15692874
 ] 

Stefan Richter commented on FLINK-5024:
---

Independently of this, I also talked with Aljoscha about state descriptors. 
What I don't like in the current implementation is that it contains the state 
name and value type serializer, but for example not the namespace type 
serializer. This is also a little inconsistent, so +1 for some changes here.

> Add SimpleStateDescriptor to clarify the concepts
> -
>
> Key: FLINK-5024
> URL: https://issues.apache.org/jira/browse/FLINK-5024
> Project: Flink
>  Issue Type: Improvement
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, StateDescriptors accept two type arguments : the first one is the 
> type of the created state and the second one is the type of the values in the 
> states. 
> The concepts however is a little confusing here because in ListStates, the 
> arguments passed to the StateDescriptors are the types of the list elements 
> instead of the lists. It also makes the implementation of MapStates difficult.
> I suggest not to put the type serializer in StateDescriptors, making 
> StateDescriptors independent of the data structures of the values. 
> A new type of StateDescriptor named SimpleStateDescriptor can be provided to 
> abstract those states (namely ValueState, ReducingState and FoldingState) 
> whose states are not composited. 
> The states (e.g. ListStates and MapStates) can implement their own 
> descriptors according to their data structures. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4100) RocksDBStateBackend#close() can throw NPE

2016-11-28 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15701566#comment-15701566
 ] 

Stefan Richter commented on FLINK-4100:
---

Yes, this looks very outdated.

> RocksDBStateBackend#close() can throw NPE
> -
>
> Key: FLINK-4100
> URL: https://issues.apache.org/jira/browse/FLINK-4100
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Priority: Trivial
>
> When running the RocksDBStateBackendTest on Windows i ran into an NPE. The 
> tests are aborted in the @Before checkOperatingSystem method (which is 
> correct behaviour), but the test still calls dispose() in @After teardown().
> This lead to an NPE since the lock object used is null; it was not 
> initialized since initializeForJob() was never called and there is no null 
> check.
> {code}
> testCopyDefaultValue(org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest)
>   Time elapsed: 0 sec  <<< ERROR!
> java.lang.NullPointerException: null
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.dispose(RocksDBStateBackend.java:318)
> at 
> org.apache.flink.runtime.state.StateBackendTestBase.teardown(StateBackendTestBase.java:71)
> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5053) Incremental / lightweight snapshots for checkpoints

2016-11-18 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-5053:
--
Description: 
There is currently basically no difference between savepoints and checkpoints 
in Flink and both are created through exactly the same process.

However, savepoints and checkpoints have a slightly different meaning which we 
should take into account to keep Flink efficient:

- Savepoints are (typically infrequently) triggered by the user to create a 
state from which the application can be restarted, e.g. because Flink, some 
code, or the parallelism needs to be changed.

- Checkpoints are (typically frequently) triggered by the System to allow for 
fast recovery in case of failure, but keeping the job/system unchanged.

This means that savepoints and checkpoints can have different properties in 
that:

- Savepoint should represent a state of the application, where characteristics 
of the job (e.g. parallelism) can be adjusted for the next restart. One example 
for things that savepoints need to be aware of are key-groups. Savepoints can 
potentially be a little more expensive than checkpoints, because they are 
usually created a lot less frequently through the user.

- Checkpoints are frequently triggered by the system to allow for fast failure 
recovery. However, failure recovery leaves all characteristics of the job 
unchanged. This checkpoints do not have to be aware of those, e.g. think again 
of key groups. Checkpoints should run faster than creating savepoints, in 
particular it would be nice to have incremental checkpoints.

For a first approach, I would suggest the following steps/changes:

- In checkpoint coordination: differentiate between triggering checkpoints 
and savepoints. Introduce properties for checkpoints that describe their set of 
abilities, e.g. "is-key-group-aware", "is-incremental".

- In state handle infrastructure: introduce state handles that reflect 
incremental checkpoints and drop full key-group awareness, i.e. covering 
folders instead of files and not having keygroup_id -> file/offset mapping, but 
keygroup_range -> folder?

- Backend side: We should start with RocksDB by reintroducing something similar 
to semi-async snapshots, but using 
BackupableDBOptions::setShareTableFiles(true) and transferring only new 
incremental outputs to HDFS. Notice that using RocksDB's internal backup 
mechanism is giving up on the information about individual key-groups. But as 
explained above, this should be totally acceptable for checkpoints, while 
savepoints should use the key-group-aware fully async mode. Of course we also 
need to implement the ability to restore from both types of snapshots.

One problem in the suggested approach is still that even checkpoints should 
support scale-down, in case that only a smaller number of instances is left 
available in a recovery case.




  was:
There is currently basically no difference between savepoints and checkpoints 
in Flink and both are created through exactly the same process.

However, savepoints and checkpoints have a slightly different meaning which we 
should take into account to keep Flink efficient:

- Savepoints are (typically infrequently) triggered by the user to create a 
state from which the application can be restarted, e.g. because Flink, some 
code, or the parallelism needs to be changed.

- Checkpoints are (typically frequently) triggered by the System to allow for 
fast recovery in case of failure, but keeping the job/system unchanged.

This means that savepoints and checkpoints can have different properties in 
that:

- Savepoint should represent a state of the application, where characteristics 
of the job (e.g. parallelism) can be adjusted for the next restart. One example 
for things that savepoints need to be aware of are key-groups. Savepoints can 
potentially be a little more expensive than checkpoints, because they are 
usually created a lot less frequently through the user.

- Checkpoints are frequently triggered by the system to allow for fast failure 
recovery. However, failure recovery leaves all characteristics of the job 
unchanged. This checkpoints do not have to be aware of those, e.g. think again 
of key groups. Checkpoints should run faster than creating savepoints, in 
particular it would be nice to have incremental checkpoints.

For a first approach, I would suggest the following steps/changes:

- In checkpoint coordination: differentiate between triggering checkpoints 
and savepoints. Introduce properties for checkpoints that describe their set of 
abilities, e.g. "is-key-group-aware", "is-incremental".

- In state handle infrastructure: introduce state handles that reflect 
incremental checkpoints and drop full key-group awareness, i.e. covering 
folders instead of files and not having keygroup_id -> file/offset mapping, but 
keygroup_range -> folder?

- Backend side: We should start 

[jira] [Created] (FLINK-5099) Test testCancelPartitionRequest is instable

2016-11-18 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5099:
-

 Summary: Test testCancelPartitionRequest is instable
 Key: FLINK-5099
 URL: https://issues.apache.org/jira/browse/FLINK-5099
 Project: Flink
  Issue Type: Bug
  Components: Network
Reporter: Stefan Richter


I observed this test fail on Travis (very rarely):

testCancelPartitionRequest(org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest)
  Time elapsed: 168.756 sec  <<< ERROR!
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2367)
at 
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
at 
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
at 
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
at java.lang.StringBuilder.append(StringBuilder.java:132)
at 
org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.testCancelPartitionRequest(CancelPartitionRequestTest.java:94)

Results :

Tests in error: 
  CancelPartitionRequestTest.testCancelPartitionRequest:94 ยป OutOfMemory Java 
he...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5100) Test testZooKeeperReelection is instable

2016-11-18 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5100:
-

 Summary: Test testZooKeeperReelection is instable
 Key: FLINK-5100
 URL: https://issues.apache.org/jira/browse/FLINK-5100
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Reporter: Stefan Richter


I observed this test failing (very rarely) on Travis:
 
testZooKeeperReelection(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest)
  Time elapsed: 303.321 sec  <<< FAILURE!
java.lang.AssertionError: null
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertFalse(Assert.java:64)
at org.junit.Assert.assertFalse(Assert.java:74)
at 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelection(ZooKeeperLeaderElectionTest.java:197)


Results :

Failed tests: 
  ZooKeeperLeaderElectionTest.testZooKeeperReelection:197 null



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   5   6   7   8   9   10   >