[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982 ] Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:11 PM: I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}, and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. was (Author: srichter): I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}, and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982 ] Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:13 PM: I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed into the ExecutionGraph implementations. The web interface needs to be changed to extract the information through the interface. was (Author: srichter): I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982 ] Stefan Richter commented on FLINK-4037: --- I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}, and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982 ] Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:12 PM: I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. was (Author: srichter): I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}, and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331982#comment-15331982 ] Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:14 PM: I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Maybe there is some cleaner way of introducing a real life cycle for ExecutionGraphs? Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed into the ExecutionGraph implementations. The web interface needs to be changed to extract the information through the interface. was (Author: srichter): I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed into the ExecutionGraph implementations. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4037: -- Assignee: (was: Stefan Richter) > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4083) Use ClosureCleaner for Join where and equalTo
Stefan Richter created FLINK-4083: - Summary: Use ClosureCleaner for Join where and equalTo Key: FLINK-4083 URL: https://issues.apache.org/jira/browse/FLINK-4083 Project: Flink Issue Type: Bug Affects Versions: 1.0.3 Reporter: Stefan Richter Assignee: Stefan Richter Priority: Minor When specifying a key selector in the where or equalTo clause of a Join, the closure cleaner is not used. Same problem as FLINK-4078. {code} .join(ds) .where(new KeySelector() { @Override public Integer getKey(CustomType value) { return value.myInt; } }) .equalTo(new KeySelector (){ @Override public Integer getKey(CustomType value) throws Exception { return value.myInt; } }); {code} The problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-4083) Use ClosureCleaner for Join where and equalTo
[ https://issues.apache.org/jira/browse/FLINK-4083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4083: -- Comment: was deleted (was: Same problem.) > Use ClosureCleaner for Join where and equalTo > - > > Key: FLINK-4083 > URL: https://issues.apache.org/jira/browse/FLINK-4083 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Minor > > When specifying a key selector in the where or equalTo clause of a Join, the > closure cleaner is not used. Same problem as FLINK-4078. > {code} > .join(ds) > .where(new KeySelector() { > @Override > public Integer getKey(CustomType value) { > return value.myInt; > } > }) > .equalTo(new KeySelector (){ > @Override > public Integer getKey(CustomType value) throws Exception { > return value.myInt; > } > }); > {code} > The problem is that the KeySelector is an anonymous inner class and as such > as a reference to the outer object. Normally, this would be rectified by the > closure cleaner but the cleaner is not used in CoGroup.where(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4083) Use ClosureCleaner for Join where and equalTo
[ https://issues.apache.org/jira/browse/FLINK-4083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4083: -- Description: When specifying a key selector in the where or equalTo clause of a Join, the closure cleaner is not used. Same problem as FLINK-4078. {code} .join(ds) .where(new KeySelector() { @Override public Integer getKey(CustomType value) { return value.myInt; } }) .equalTo(new KeySelector (){ @Override public Integer getKey(CustomType value) throws Exception { return value.myInt; } }); {code} The problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in Join.where() and Join.equalTo(). was: When specifying a key selector in the where or equalTo clause of a Join, the closure cleaner is not used. Same problem as FLINK-4078. {code} .join(ds) .where(new KeySelector () { @Override public Integer getKey(CustomType value) { return value.myInt; } }) .equalTo(new KeySelector (){ @Override public Integer getKey(CustomType value) throws Exception { return value.myInt; } }); {code} The problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). > Use ClosureCleaner for Join where and equalTo > - > > Key: FLINK-4083 > URL: https://issues.apache.org/jira/browse/FLINK-4083 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Minor > > When specifying a key selector in the where or equalTo clause of a Join, the > closure cleaner is not used. Same problem as FLINK-4078. > {code} > .join(ds) > .where(new KeySelector () { > @Override > public Integer getKey(CustomType value) { > return value.myInt; > } > }) > .equalTo(new KeySelector (){ > @Override > public Integer getKey(CustomType value) throws Exception { > return value.myInt; > } > }); > {code} > The problem is that the KeySelector is an anonymous inner class and as such > as a reference to the outer object. Normally, this would be rectified by the > closure cleaner but the cleaner is not used in Join.where() and > Join.equalTo(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4374) GroupReduce Broken for null Date
Stefan Richter created FLINK-4374: - Summary: GroupReduce Broken for null Date Key: FLINK-4374 URL: https://issues.apache.org/jira/browse/FLINK-4374 Project: Flink Issue Type: Bug Components: DataSet API Reporter: Stefan Richter The GroupReduceITCase has an error that allows a problem with {{nul}}l Dates to go uncovered: If I set the parallelism to 1 in {{testDateNullException()}} and all keys actually end up on the same operator, then there is a problem in the de/serialization. It seems that {{null}} values are somehow skipped by the serialization process (e.g. maybe no {{null}} indicator is written), which leads to wrong deserializations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4374) GroupReduce Broken for null Date
[ https://issues.apache.org/jira/browse/FLINK-4374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417162#comment-15417162 ] Stefan Richter edited comment on FLINK-4374 at 8/11/16 12:52 PM: - Yes, it is the date field of the Tuple2in this test. was (Author: srichter): Yes, it is the data field of the Tuple2 in this test. > GroupReduce Broken for null Date > > > Key: FLINK-4374 > URL: https://issues.apache.org/jira/browse/FLINK-4374 > Project: Flink > Issue Type: Bug > Components: DataSet API >Reporter: Stefan Richter >Assignee: Timo Walther > > The GroupReduceITCase has an error that allows a problem with {{null}} Dates > to go uncovered: > If I set the parallelism to 1 in {{testDateNullException()}} and all keys > actually end up on the same operator, then there is a problem in the > de/serialization. > It seems that {{null}} values are somehow skipped by the serialization > process (e.g. maybe no {{null}} indicator is written), which leads to wrong > deserializations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4374) GroupReduce Broken for null Date
[ https://issues.apache.org/jira/browse/FLINK-4374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417162#comment-15417162 ] Stefan Richter commented on FLINK-4374: --- Yes, it is the data field of the Tuple2in this test. > GroupReduce Broken for null Date > > > Key: FLINK-4374 > URL: https://issues.apache.org/jira/browse/FLINK-4374 > Project: Flink > Issue Type: Bug > Components: DataSet API >Reporter: Stefan Richter >Assignee: Timo Walther > > The GroupReduceITCase has an error that allows a problem with {{null}} Dates > to go uncovered: > If I set the parallelism to 1 in {{testDateNullException()}} and all keys > actually end up on the same operator, then there is a problem in the > de/serialization. > It seems that {{null}} values are somehow skipped by the serialization > process (e.g. maybe no {{null}} indicator is written), which leads to wrong > deserializations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4374) GroupReduce Broken for null Date
[ https://issues.apache.org/jira/browse/FLINK-4374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-4374: - Assignee: Timo Walther > GroupReduce Broken for null Date > > > Key: FLINK-4374 > URL: https://issues.apache.org/jira/browse/FLINK-4374 > Project: Flink > Issue Type: Bug > Components: DataSet API >Reporter: Stefan Richter >Assignee: Timo Walther > > The GroupReduceITCase has an error that allows a problem with {{nul}}l Dates > to go uncovered: > If I set the parallelism to 1 in {{testDateNullException()}} and all keys > actually end up on the same operator, then there is a problem in the > de/serialization. > It seems that {{null}} values are somehow skipped by the serialization > process (e.g. maybe no {{null}} indicator is written), which leads to wrong > deserializations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4374) GroupReduce Broken for null Date
[ https://issues.apache.org/jira/browse/FLINK-4374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4374: -- Description: The GroupReduceITCase has an error that allows a problem with {{null}} Dates to go uncovered: If I set the parallelism to 1 in {{testDateNullException()}} and all keys actually end up on the same operator, then there is a problem in the de/serialization. It seems that {{null}} values are somehow skipped by the serialization process (e.g. maybe no {{null}} indicator is written), which leads to wrong deserializations. was: The GroupReduceITCase has an error that allows a problem with {{nul}}l Dates to go uncovered: If I set the parallelism to 1 in {{testDateNullException()}} and all keys actually end up on the same operator, then there is a problem in the de/serialization. It seems that {{null}} values are somehow skipped by the serialization process (e.g. maybe no {{null}} indicator is written), which leads to wrong deserializations. > GroupReduce Broken for null Date > > > Key: FLINK-4374 > URL: https://issues.apache.org/jira/browse/FLINK-4374 > Project: Flink > Issue Type: Bug > Components: DataSet API >Reporter: Stefan Richter >Assignee: Timo Walther > > The GroupReduceITCase has an error that allows a problem with {{null}} Dates > to go uncovered: > If I set the parallelism to 1 in {{testDateNullException()}} and all keys > actually end up on the same operator, then there is a problem in the > de/serialization. > It seems that {{null}} values are somehow skipped by the serialization > process (e.g. maybe no {{null}} indicator is written), which leads to wrong > deserializations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4200) Kafka consumers should log the offset from which they restore
Stefan Richter created FLINK-4200: - Summary: Kafka consumers should log the offset from which they restore Key: FLINK-4200 URL: https://issues.apache.org/jira/browse/FLINK-4200 Project: Flink Issue Type: Improvement Components: Kafka Connector Reporter: Stefan Richter Assignee: Stefan Richter Priority: Trivial Kafka consumers should log the offset from which they restore so that it is easier to investigate problems with recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4230) Session Windowing IT Case
Stefan Richter created FLINK-4230: - Summary: Session Windowing IT Case Key: FLINK-4230 URL: https://issues.apache.org/jira/browse/FLINK-4230 Project: Flink Issue Type: Test Components: DataStream API, Local Runtime Reporter: Stefan Richter Assignee: Stefan Richter An ITCase for Session Windows is missing that tests correct behavior under several parallel sessions, with timely events, late events within and after the lateness interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4201) Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted
Stefan Richter created FLINK-4201: - Summary: Checkpoints for jobs in non-terminal state (e.g. suspended) get deleted Key: FLINK-4201 URL: https://issues.apache.org/jira/browse/FLINK-4201 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Stefan Richter Priority: Blocker For example, when shutting down a Yarn session, according to the logs checkpoints for jobs that did not terminate are deleted. In the shutdown hook, removeAllCheckpoints is called and removes checkpoints that should still be kept. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4150) Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown
[ https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4150: -- Description: Submitting a job in Yarn with HA can lead to the following exception: {code} org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 ClassLoader info: URL ClassLoader: file: '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0' (invalid JAR: zip file is empty) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) at java.lang.Thread.run(Thread.java:745) {code} Some job information, including the Blob ids, are stored in Zookeeper. The actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the cluster is shut down, the path for the BlobStore is deleted. When the cluster is then restarted, recovering jobs cannot restore because it's Blob ids stored in Zookeeper now point to deleted files. was: Submitting a job in Yarn with HA can lead to the following exception: {code} org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 ClassLoader info: URL ClassLoader: file: '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0' (invalid JAR: zip file is empty) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) at java.lang.Thread.run(Thread.java:745) {code} Some job information, including the Blob ids, are stored in Zookeeper. The actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the cluster is shut down, the path for the BlobStore is deleted. When the cluster is then restarted, recovering jobs cannot restore because it's Blob ids stored in Zookeeper now point to deleted files. In particular, this problem frequently occurs for HA in combination with -m yarn-cluster. We should discuss in how far this combination actually makes sense and what the expected behavior should be. > Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown > > > Key: FLINK-4150 > URL: https://issues.apache.org/jira/browse/FLINK-4150 > Project: Flink > Issue Type: Bug > Components: Job-Submission >Reporter: Stefan Richter >Priority: Blocker > Fix For: 1.1.0 > > > Submitting a job in Yarn with HA can lead to the following exception: > {code} > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load > user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 > ClassLoader info: URL ClassLoader: > file: > '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0' > (invalid JAR: zip file is empty) > Class not resolvable through given classloader. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > {code} > Some job information, including the Blob ids, are stored in Zookeeper. The > actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set > to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the > cluster is shut down, the path for the BlobStore is deleted. When the cluster > is then restarted, recovering jobs cannot restore because it's Blob ids > stored in Zookeeper now point to deleted files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4211) Dynamic Properties not working for jobs submitted to Yarn session
Stefan Richter created FLINK-4211: - Summary: Dynamic Properties not working for jobs submitted to Yarn session Key: FLINK-4211 URL: https://issues.apache.org/jira/browse/FLINK-4211 Project: Flink Issue Type: Bug Components: YARN Client Reporter: Stefan Richter The command line argument for dynamic properties (-D) is not working when submitting jobs to a flink session. Example: {code} bin/flink run -p 4 myJob.jar -D recovery.zookeeper.path.root=/flink/xyz {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4211) Dynamic Properties not working for jobs submitted to Yarn session
[ https://issues.apache.org/jira/browse/FLINK-4211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378999#comment-15378999 ] Stefan Richter commented on FLINK-4211: --- I see. I think unfortunately this is not really clear the documentation, one might be tempted to try and override properties. The Cli accepts those attempts without any feedback. Otherwise you can then close the issue as invalid. > Dynamic Properties not working for jobs submitted to Yarn session > - > > Key: FLINK-4211 > URL: https://issues.apache.org/jira/browse/FLINK-4211 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Stefan Richter > > The command line argument for dynamic properties (-D) is not working when > submitting jobs to a flink session. > Example: > {code} > bin/flink run -p 4 myJob.jar -D recovery.zookeeper.path.root=/flink/xyz > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.
Stefan Richter created FLINK-4182: - Summary: HA recovery not working properly under ApplicationMaster failures. Key: FLINK-4182 URL: https://issues.apache.org/jira/browse/FLINK-4182 Project: Flink Issue Type: Bug Components: Distributed Coordination, State Backends, Checkpointing Affects Versions: 1.0.3 Reporter: Stefan Richter When randomly killing TaskManager and ApplicationMaster, a job sometimes does not properly recover in HA mode. There can be different symptoms for this. For example, in one case the job is dying with the following exception: {code} The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot set up the user code libraries: Cannot get library with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381) at da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set up the user code libraries: Cannot get library with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at
[jira] [Updated] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.
[ https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4182: -- Description: When randomly killing TaskManager and ApplicationMaster, a job sometimes does not properly recover in HA mode. There can be different symptoms for this. For example, in one case the job is dying with the following exception: {code} The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot set up the user code libraries: Cannot get library with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381) at da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set up the user code libraries: Cannot get library with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Cannot get library with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 at
[jira] [Created] (FLINK-4166) Generate automatic different namespaces in Zookeeper for Flink applications
Stefan Richter created FLINK-4166: - Summary: Generate automatic different namespaces in Zookeeper for Flink applications Key: FLINK-4166 URL: https://issues.apache.org/jira/browse/FLINK-4166 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.0.3 Reporter: Stefan Richter We should automatically generate different namespaces per Flink application in Zookeeper to avoid interference between different applications that refer to the same Zookeeper entries. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4150) Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown
Stefan Richter created FLINK-4150: - Summary: Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown Key: FLINK-4150 URL: https://issues.apache.org/jira/browse/FLINK-4150 Project: Flink Issue Type: Bug Components: Job-Submission Reporter: Stefan Richter Submitting a job in Yarn with HA can lead to the following exception: {code} org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 ClassLoader info: URL ClassLoader: file: '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0' (invalid JAR: zip file is empty) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) at java.lang.Thread.run(Thread.java:745) {code} Some job information, including the Blob ids, are stored in Zookeeper. The actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the cluster is shut down, the path for the BlobStore is deleted. When the cluster is then restarted, recovering jobs cannot restore because it's Blob ids stored in Zookeeper now point to deleted files. In particular, this problem frequently occurs for HA in combination with -m yarn-cluster. We should discuss in how far this combination actually makes sense and what the expected behavior should be. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4156) Job with -m yarn-cluster registers TaskManagers to another running Yarn session
Stefan Richter created FLINK-4156: - Summary: Job with -m yarn-cluster registers TaskManagers to another running Yarn session Key: FLINK-4156 URL: https://issues.apache.org/jira/browse/FLINK-4156 Project: Flink Issue Type: Bug Components: Distributed Coordination Reporter: Stefan Richter When a job is started using cluster mode (-m yarn-cluster) and a Yarn session is running on the same cluster, the job accidentally registers it's worker tasks with the ongoing Yarn session. This happens because the same Zookeeper namespace is used. We should consider isolating Flink applications from another by using UUIDS, e.g. based on their application ids, in their Zookeeper paths. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3623) Adjust MurmurHash algorithm
[ https://issues.apache.org/jira/browse/FLINK-3623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414110#comment-15414110 ] Stefan Richter commented on FLINK-3623: --- In my opinion, our implementation of Murmur might be too complicated and computationally expensive for simply hashing an int. Most parts of the used algorithm are only required for arbitrary byte[] keys. What matters for mixing the bits of an int is the finalizer part of Murmur3, which is: h ^= h >> 16; h *= 0x85ebca6b; h ^= h >> 13; h *= 0xc2b2ae35; h ^= h >> 16; If I remember correctly, the purpose of the remaining code is to generate the intermediate hash from blocks over variable size byte[], whereas the purpose of the finalizer is to actually increase entropy and reduce collisions of the intermediate hash. Restricting the code to these lines could speed up hashing significantly, which might be even more relevant in case hash calculations are also used to determine keygroups. > Adjust MurmurHash algorithm > --- > > Key: FLINK-3623 > URL: https://issues.apache.org/jira/browse/FLINK-3623 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.1.0 > > > Flink's MurmurHash implementation differs from the published algorithm. > From Flink's MathUtils.java: > {code} > code *= 0xe6546b64; > {code} > The Murmur3_32 algorithm as described by > [Wikipedia|https://en.wikipedia.org/wiki/MurmurHash]: > {code} > m โ 5 > n โ 0xe6546b64 > hash โ hash ร m + n > {code} > and in Guava's Murmur3_32HashFunction.java: > {code} > h1 = h1 * 5 + 0xe6546b64; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4134) EventTimeSessionWindows trigger for empty windows when dropping late events
Stefan Richter created FLINK-4134: - Summary: EventTimeSessionWindows trigger for empty windows when dropping late events Key: FLINK-4134 URL: https://issues.apache.org/jira/browse/FLINK-4134 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.0.3 Reporter: Stefan Richter It seems like EventTimeSessionWindows sometimes trigger for empty windows. The behavior is observed in connection with dropping late events: {code} stream .keyBy("sessionKey") .window(EventTimeSessionWindows.withGap(Time.milliseconds(100))) .allowedLateness(Time.milliseconds(0)) .apply(new ValidatingWindowFunction()) .print(); {code} I wrote a generator that generates events for several parallel sessions and that allows to reproduce the error. For now, I can share this generator privately for debugging purposes, but my plan is to use the generator as basis for an integration test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4134) EventTimeSessionWindows trigger for empty windows when dropping late events
[ https://issues.apache.org/jira/browse/FLINK-4134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-4134: - Assignee: Stefan Richter > EventTimeSessionWindows trigger for empty windows when dropping late events > --- > > Key: FLINK-4134 > URL: https://issues.apache.org/jira/browse/FLINK-4134 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Assignee: Stefan Richter > > It seems like EventTimeSessionWindows sometimes trigger for empty windows. > The behavior is observed in connection with dropping late events: > {code} > stream > .keyBy("sessionKey") > .window(EventTimeSessionWindows.withGap(Time.milliseconds(100))) > .allowedLateness(Time.milliseconds(0)) > .apply(new ValidatingWindowFunction()) > .print(); > {code} > I wrote a generator that generates events for several parallel sessions and > that allows to reproduce the error. For now, I can share this generator > privately for debugging purposes, but my plan is to use the generator as > basis for an integration test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4140) CheckpointCoordinator fails to discard completed checkpoint
[ https://issues.apache.org/jira/browse/FLINK-4140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-4140: - Assignee: Stefan Richter > CheckpointCoordinator fails to discard completed checkpoint > --- > > Key: FLINK-4140 > URL: https://issues.apache.org/jira/browse/FLINK-4140 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Running a job in HA mode I saw the following warning in the job manager logs. > The warning appeared after the job was restarted due to a master failure. > I've skimmed the code and it looks like the user code class loader is used > everywhere when discarding the checkpoint, but something seems to not work as > expected (otherwise the warning should not appear). > {code} > 2016-07-01 13:08:33,218 WARN > org.apache.flink.runtime.checkpoint.SubtaskState - Failed to > discard checkpoint state: StateForTask(Size: 4, Duration: 2012, State: > SerializedValue) > java.lang.ClassNotFoundException: da.testing.State > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:278) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) > at > org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at java.util.HashMap.readObject(HashMap.java:1180) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at
[jira] [Updated] (FLINK-4140) CheckpointCoordinator fails to discard completed checkpoint
[ https://issues.apache.org/jira/browse/FLINK-4140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4140: -- Assignee: Ufuk Celebi (was: Stefan Richter) > CheckpointCoordinator fails to discard completed checkpoint > --- > > Key: FLINK-4140 > URL: https://issues.apache.org/jira/browse/FLINK-4140 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Assignee: Ufuk Celebi > > Running a job in HA mode I saw the following warning in the job manager logs. > The warning appeared after the job was restarted due to a master failure. > I've skimmed the code and it looks like the user code class loader is used > everywhere when discarding the checkpoint, but something seems to not work as > expected (otherwise the warning should not appear). > {code} > 2016-07-01 13:08:33,218 WARN > org.apache.flink.runtime.checkpoint.SubtaskState - Failed to > discard checkpoint state: StateForTask(Size: 4, Duration: 2012, State: > SerializedValue) > java.lang.ClassNotFoundException: da.testing.State > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:278) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) > at > org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at java.util.HashMap.readObject(HashMap.java:1180) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at
[jira] [Created] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.5.0
Stefan Richter created FLINK-4142: - Summary: Recovery problem in HA on Hadoop Yarn 2.5.0 Key: FLINK-4142 URL: https://issues.apache.org/jira/browse/FLINK-4142 Project: Flink Issue Type: Bug Components: YARN Client Affects Versions: 1.0.3 Reporter: Stefan Richter On Hadoop Yarn 2.5.0, recovery in HA fails in the following scenario: 1) Kill application master, let it recover normally. 2) After that, kill a task manager. Now, Yarn tries to restart the killed task manager in an endless loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4140) CheckpointCoordinator fails to discard completed checkpoint
Stefan Richter created FLINK-4140: - Summary: CheckpointCoordinator fails to discard completed checkpoint Key: FLINK-4140 URL: https://issues.apache.org/jira/browse/FLINK-4140 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.0.3 Reporter: Stefan Richter Running a job in HA mode I saw the following warning in the job manager logs. The warning appeared after the job was restarted due to a master failure. I've skimmed the code and it looks like the user code class loader is used everywhere when discarding the checkpoint, but something seems to not work as expected (otherwise the warning should not appear). {code} 2016-07-01 13:08:33,218 WARN org.apache.flink.runtime.checkpoint.SubtaskState - Failed to discard checkpoint state: StateForTask(Size: 4, Duration: 2012, State: SerializedValue) java.lang.ClassNotFoundException: da.testing.State at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at java.util.HashMap.readObject(HashMap.java:1180) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
[jira] [Created] (FLINK-4141) TaskManager failures not always recover when killed during an ApplicationMaster failure in HA mode on Yarn
Stefan Richter created FLINK-4141: - Summary: TaskManager failures not always recover when killed during an ApplicationMaster failure in HA mode on Yarn Key: FLINK-4141 URL: https://issues.apache.org/jira/browse/FLINK-4141 Project: Flink Issue Type: Bug Affects Versions: 1.0.3 Reporter: Stefan Richter High availability on Yarn often fails to recover in the following test scenario: 1. Kill application master process. 2. Then, while application master is recovering, randomly kill several task managers (with some delay). After the application master recovered, not all the killed task manager are brought back and no further attempts are made the restart them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.4.1
[ https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4142: -- Summary: Recovery problem in HA on Hadoop Yarn 2.4.1 (was: Recovery problem in HA on Hadoop Yarn 2.5.0) > Recovery problem in HA on Hadoop Yarn 2.4.1 > --- > > Key: FLINK-4142 > URL: https://issues.apache.org/jira/browse/FLINK-4142 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Stefan Richter > > On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario: > 1) Kill application master, let it recover normally. > 2) After that, kill a task manager. > Now, Yarn tries to restart the killed task manager in an endless loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.5.0
[ https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4142: -- Description: On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario: 1) Kill application master, let it recover normally. 2) After that, kill a task manager. Now, Yarn tries to restart the killed task manager in an endless loop. was: On Hadoop Yarn 2.5.0, recovery in HA fails in the following scenario: 1) Kill application master, let it recover normally. 2) After that, kill a task manager. Now, Yarn tries to restart the killed task manager in an endless loop. > Recovery problem in HA on Hadoop Yarn 2.5.0 > --- > > Key: FLINK-4142 > URL: https://issues.apache.org/jira/browse/FLINK-4142 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Stefan Richter > > On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario: > 1) Kill application master, let it recover normally. > 2) After that, kill a task manager. > Now, Yarn tries to restart the killed task manager in an endless loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.4.1
[ https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359073#comment-15359073 ] Stefan Richter commented on FLINK-4142: --- I have a log for the problem here: https://storage.googleapis.com/srichter/task_mgr_restart_endless.log > Recovery problem in HA on Hadoop Yarn 2.4.1 > --- > > Key: FLINK-4142 > URL: https://issues.apache.org/jira/browse/FLINK-4142 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Stefan Richter > > On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario: > 1) Kill application master, let it recover normally. > 2) After that, kill a task manager. > Now, Yarn tries to restart the killed task manager in an endless loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5730) User can concurrently modify state metadata of RocksDB asynchronous snapshots
Stefan Richter created FLINK-5730: - Summary: User can concurrently modify state metadata of RocksDB asynchronous snapshots Key: FLINK-5730 URL: https://issues.apache.org/jira/browse/FLINK-5730 Project: Flink Issue Type: Bug Reporter: Stefan Richter The current implementation of asynchronous snapshots in RocksDB iterates the original state metadata structures as part of the asynchronous snapshot. Users could potentially modify the state metadata concurrently (e.g. by registering a new state while the snapshot is running), thereby corrupting the metadata. For the way most users are registering their states (at the start of the task), this is not a problem. However, if state is conditionally registered at some later point in time this can be problematic. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5730) Users can concurrently modify state metadata of RocksDB asynchronous snapshots
[ https://issues.apache.org/jira/browse/FLINK-5730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-5730: -- Summary: Users can concurrently modify state metadata of RocksDB asynchronous snapshots (was: User can concurrently modify state metadata of RocksDB asynchronous snapshots) > Users can concurrently modify state metadata of RocksDB asynchronous snapshots > -- > > Key: FLINK-5730 > URL: https://issues.apache.org/jira/browse/FLINK-5730 > Project: Flink > Issue Type: Bug >Reporter: Stefan Richter > > The current implementation of asynchronous snapshots in RocksDB iterates the > original state metadata structures as part of the asynchronous snapshot. > Users could potentially modify the state metadata concurrently (e.g. by > registering a new state while the snapshot is running), thereby corrupting > the metadata. > For the way most users are registering their states (at the start of the > task), this is not a problem. However, if state is conditionally registered > at some later point in time this can be problematic. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend
Stefan Richter created FLINK-5715: - Summary: Asynchronous snapshotting for HeapKeyedStateBackend Key: FLINK-5715 URL: https://issues.apache.org/jira/browse/FLINK-5715 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Stefan Richter Assignee: Stefan Richter Blocking snapshots render the HeapKeyedStateBackend practically unusable for many user in productions. Their jobs can not tolerate stopped processing for the time it takes to write gigabytes of data from memory to disk. Asynchronous snapshots would be a solution to this problem. The challenge for the implementation is coming up with a copy-on-write scheme for the in-memory hash maps that build the foundation of this backend. After taking a closer look, this problem is twofold. First, providing CoW semantics for the hashmap itself, as a mutible structure, thereby avoiding costly locking or blocking where possible. Second, CoW for the mutable value objects, e.g. through cloning via serializers. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5707) Find better keys for backend configuration parameters
Stefan Richter created FLINK-5707: - Summary: Find better keys for backend configuration parameters Key: FLINK-5707 URL: https://issues.apache.org/jira/browse/FLINK-5707 Project: Flink Issue Type: Improvement Affects Versions: 1.3.0 Reporter: Stefan Richter Priority: Minor Currently, some config keys for the backends are confusing or even misleading and could be renamed. For example `state.backend.fs.checkpointdir` -> `state.backend.checkpoints.dir` `state.backend.rocksdb.checkpointdir` -> `state.backend.rocksdb.workdir` `state.checkpoints.dir` This would reflect their purposes much better. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5436) UDF state without CheckpointedRestoring can result in restarting loop
[ https://issues.apache.org/jira/browse/FLINK-5436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834292#comment-15834292 ] Stefan Richter commented on FLINK-5436: --- I think that would be good to have in. > UDF state without CheckpointedRestoring can result in restarting loop > - > > Key: FLINK-5436 > URL: https://issues.apache.org/jira/browse/FLINK-5436 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Minor > > When restoring a job with Checkpointed state and not implementing the new > CheckpointedRestoring interface, the job will be restarted over and over > again (given the respective restarting strategy). > Since this is not recoverable, we should immediately fail the job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5663) Checkpoint fails because of closed registry
[ https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840070#comment-15840070 ] Stefan Richter commented on FLINK-5663: --- Then again, question is if this task already ended and for some reason a checkpoint was still somewhere in the process of opening a stream. The exception itself is not problematic as long as the task was already finished for some reason before. > Checkpoint fails because of closed registry > --- > > Key: FLINK-5663 > URL: https://issues.apache.org/jira/browse/FLINK-5663 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi > > While testing the 1.2.0 release I got the following Exception: > {code} > 2017-01-26 17:29:20,602 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source (3/8) > (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED. > java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom > Source (3/8) > at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator > Source: Custom Source (3/8). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533) > at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108) > ... 5 more > Caused by: java.lang.Exception: Could not complete snapshot 2 for operator > Source: Custom Source (3/8). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528) > ... 6 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > ... 11 more > Caused by: java.io.IOException: Could not open output stream for state backend > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305) > ... 13 more > Caused by: java.io.IOException: Cannot register Closeable, registry is > already closed. Closing argument. > at > org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63) > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359) > ... 15 more > {code} > The job recovered and kept running. > [~stefanrichte...@gmail.com] Can this be a race with the closable registry? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5663) Checkpoint fails because of closed registry
[ https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840068#comment-15840068 ] Stefan Richter commented on FLINK-5663: --- I don't think this is a race condition. This exception only happens when the registry was already closed which in turn should only happen after the enclosing task ended. > Checkpoint fails because of closed registry > --- > > Key: FLINK-5663 > URL: https://issues.apache.org/jira/browse/FLINK-5663 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi > > While testing the 1.2.0 release I got the following Exception: > {code} > 2017-01-26 17:29:20,602 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source (3/8) > (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED. > java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom > Source (3/8) > at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator > Source: Custom Source (3/8). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533) > at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108) > ... 5 more > Caused by: java.lang.Exception: Could not complete snapshot 2 for operator > Source: Custom Source (3/8). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528) > ... 6 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > ... 11 more > Caused by: java.io.IOException: Could not open output stream for state backend > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305) > ... 13 more > Caused by: java.io.IOException: Cannot register Closeable, registry is > already closed. Closing argument. > at > org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63) > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359) > ... 15 more > {code} > The job recovered and kept running. > [~stefanrichte...@gmail.com] Can this be a race with the closable registry? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5663) Checkpoint fails because of closed registry
[ https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840101#comment-15840101 ] Stefan Richter commented on FLINK-5663: --- The only place this registry ever gets closed is through one method, only called at the end of a task. So the first thing that is a little less transparent from my point of view is the ThreadLocal behaviour. Do you have logs for this run? There could be some useful info outputs. > Checkpoint fails because of closed registry > --- > > Key: FLINK-5663 > URL: https://issues.apache.org/jira/browse/FLINK-5663 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi > > While testing the 1.2.0 release I got the following Exception: > {code} > 2017-01-26 17:29:20,602 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source (3/8) > (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED. > java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom > Source (3/8) > at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator > Source: Custom Source (3/8). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533) > at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108) > ... 5 more > Caused by: java.lang.Exception: Could not complete snapshot 2 for operator > Source: Custom Source (3/8). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528) > ... 6 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > ... 11 more > Caused by: java.io.IOException: Could not open output stream for state backend > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305) > ... 13 more > Caused by: java.io.IOException: Cannot register Closeable, registry is > already closed. Closing argument. > at > org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63) > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359) > ... 15 more > {code} > The job recovered and kept running. > [~stefanrichte...@gmail.com] Can this be a race with the closable registry? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5663) Checkpoint fails because of closed registry
[ https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-5663: - Assignee: Stefan Richter > Checkpoint fails because of closed registry > --- > > Key: FLINK-5663 > URL: https://issues.apache.org/jira/browse/FLINK-5663 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Stefan Richter > > While testing the 1.2.0 release I got the following Exception: > {code} > 2017-01-26 17:29:20,602 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source (3/8) > (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED. > java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom > Source (3/8) > at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator > Source: Custom Source (3/8). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533) > at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108) > ... 5 more > Caused by: java.lang.Exception: Could not complete snapshot 2 for operator > Source: Custom Source (3/8). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528) > ... 6 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > ... 11 more > Caused by: java.io.IOException: Could not open output stream for state backend > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305) > ... 13 more > Caused by: java.io.IOException: Cannot register Closeable, registry is > already closed. Closing argument. > at > org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63) > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359) > ... 15 more > {code} > The job recovered and kept running. > [~stefanrichte...@gmail.com] Can this be a race with the closable registry? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5681) Make ReaperThread for SafetyNetCloseableRegistry a singleton
Stefan Richter created FLINK-5681: - Summary: Make ReaperThread for SafetyNetCloseableRegistry a singleton Key: FLINK-5681 URL: https://issues.apache.org/jira/browse/FLINK-5681 Project: Flink Issue Type: Improvement Components: Core Reporter: Stefan Richter Assignee: Stefan Richter Currently, each {{SafetyNetCloseableRegistry}} spawns an own ReaperThread. However, this duty could also be fulfilled by a single ReaperThread that is shared by all registries to save unnecessary threads. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] (FLINK-5044) Converting operator and function state from Flink 1.1 for all changed operators in 1.2
Title: Message Title Stefan Richter commented on FLINK-5044 ย Re: Converting operator and function state from Flink 1.1 for all changed operators in 1.2 No, this is solved through our migration story. Add Comment ย This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (FLINK-5480) User-provided hashes for operators
Title: Message Title Stefan Richter commented on FLINK-5480 ย Re: User-provided hashes for operators From my side yes. Add Comment ย This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] [Commented] (FLINK-5740) Make WrappingFunction an interface and move to flink-core
[ https://issues.apache.org/jira/browse/FLINK-5740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15859254#comment-15859254 ] Stefan Richter commented on FLINK-5740: --- +1 very good idea. > Make WrappingFunction an interface and move to flink-core > - > > Key: FLINK-5740 > URL: https://issues.apache.org/jira/browse/FLINK-5740 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Making it an interface and having an {{AbstractWrappingFunction}} will allow > implementations to have different classes as base classes. Also, we should > change {{FunctionUtils}} to work like {{StreamingFunctionUtils}} so that > wrapping functions don't have to implement the methods of {{RichFunction}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1725) New Partitioner for better load balancing for skewed data
[ https://issues.apache.org/jira/browse/FLINK-1725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15871572#comment-15871572 ] Stefan Richter commented on FLINK-1725: --- I agree with you [~aljoscha]. While this would be nice to have, I think it is way more involved than it seems, because this has performance implications and several interactions with other Flink features, such as rescaling or queryable state. > New Partitioner for better load balancing for skewed data > - > > Key: FLINK-1725 > URL: https://issues.apache.org/jira/browse/FLINK-1725 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 0.8.1 >Reporter: Anis Nasir >Assignee: Anis Nasir > Labels: LoadBalancing, Partitioner > Original Estimate: 336h > Remaining Estimate: 336h > > Hi, > We have recently studied the problem of load balancing in Storm [1]. > In particular, we focused on key distribution of the stream for skewed data. > We developed a new stream partitioning scheme (which we call Partial Key > Grouping). It achieves better load balancing than key grouping while being > more scalable than shuffle grouping in terms of memory. > In the paper we show a number of mining algorithms that are easy to implement > with partial key grouping, and whose performance can benefit from it. We > think that it might also be useful for a larger class of algorithms. > Partial key grouping is very easy to implement: it requires just a few lines > of code in Java when implemented as a custom grouping in Storm [2]. > For all these reasons, we believe it will be a nice addition to the standard > Partitioners available in Flink. If the community thinks it's a good idea, we > will be happy to offer support in the porting. > References: > [1]. > https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf > [2]. https://github.com/gdfm/partial-key-grouping -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-3841) RocksDB statebackend creates empty dbs for stateless operators
[ https://issues.apache.org/jira/browse/FLINK-3841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-3841. - Resolution: Not A Problem Fix Version/s: 1.2.0 Not a problem any more. Will close this because it seems outdated. > RocksDB statebackend creates empty dbs for stateless operators > -- > > Key: FLINK-3841 > URL: https://issues.apache.org/jira/browse/FLINK-3841 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.0 >Reporter: Gyula Fora >Assignee: MaGuowei >Priority: Minor > Fix For: 1.2.0 > > > Even though they are not checkpointed there is always an open RocksDB > database for all operators if the Rocks backend is set. I wonder if it would > make sense to lazy initialize the dbs instead of doing it in the > initializeForJob method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2491) Operators are not participating in state checkpointing in some cases
[ https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15871538#comment-15871538 ] Stefan Richter commented on FLINK-2491: --- This seems still valid, but I assume it is not in progress anymore. I think the reason for the problem is that some operator instances might shutdown at some point, but the checkpoint coordinator is still expecting all instances that were initially started to confirm in a checkpoint. What we would need is some way to unregister operator instances from checkpointing after their shutdown. Also this should be reestablished in case of restarts. Is this summary correct [~aljoscha] [~StephanEwen]? > Operators are not participating in state checkpointing in some cases > > > Key: FLINK-2491 > URL: https://issues.apache.org/jira/browse/FLINK-2491 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.10.0 >Reporter: Robert Metzger >Assignee: Mรกrton Balassi >Priority: Critical > Fix For: 1.0.0 > > > While implementing a test case for the Kafka Consumer, I came across the > following bug: > Consider the following topology, with the operator parallelism in parentheses: > Source (2) --> Sink (1). > In this setup, the {{snapshotState()}} method is called on the source, but > not on the Sink. > The sink receives the generated data. > only one of the two sources is generating data. > I've implemented a test case for this, you can find it here: > https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5932) Order of legacy vs new state initialization in the AbstractStreamOperator.
[ https://issues.apache.org/jira/browse/FLINK-5932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891930#comment-15891930 ] Stefan Richter commented on FLINK-5932: --- I think it makes a lot of sense if it is easily possible. > Order of legacy vs new state initialization in the AbstractStreamOperator. > -- > > Key: FLINK-5932 > URL: https://issues.apache.org/jira/browse/FLINK-5932 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > Currently in the > {{AbstractStreamOperator::initializeState(OperatorStateHandles > stateHandles)}}, the {{restoreStreamCheckpointed}} which is responsible for > restoring state from previous Flink versions, (backwards compatibility) is > called before the {{initializeState(StateInitializationContext context)}} > which is responsible for initializing the state in Flink 1.2. > This has the negative side effect that when implementing the backwards > compatibility strategy for a given operator, we have to restore the old > state, store it in local variables, and register it with the new state > abstractions in the {{initializeState()}} or the {{open()}}. This creates a > lot of unnecessary code in the operators, and potential memory leaks if the > local variables are not "null-ified". > This issue proposes to call the {{restoreStreamCheckpointed}} after the > {{initializeState(StateInitializationContext context)}}. This way, the new > operator state will have been initialized (e.g. keyed state), and the > {{restoreStreamCheckpointed}} will be able to register its state directly > with the new abstractions, instead of putting it in local variables and wait > for the {{initializeState}} or the {{open()}} to re-register it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5917) Remove MapState.size()
[ https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891945#comment-15891945 ] Stefan Richter commented on FLINK-5917: --- I agree with [~aljoscha]. Unlike Java's HashMap, RocksDB does not give any feedback if a key already existed and this makes a lot of sense if you consider the LSM implementation of RocksDB. RocksDB is offering an estimate on the key-count (probably implemented as hyper-log-log), but I think they would expose the exact count if this could be easily done. The only workaround that I see to maintain a cached count is to perform a lookup before every insert, which should be prohibitively expensive. > Remove MapState.size() > -- > > Key: FLINK-5917 > URL: https://issues.apache.org/jira/browse/FLINK-5917 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek > > I'm proposing to remove {{size()}} because it is a prohibitively expensive > operation and users might not be aware of it. Instead of {{size()}} users can > use an iterator over all mappings to determine the size, when doing this they > will be aware of the fact that it is a costly operation. > Right now, {{size()}} is only costly on the RocksDB state backend but I think > with future developments on the in-memory state backend it might also become > an expensive operation there. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-4493) Unify the snapshot output format for keyed-state backends
Stefan Richter created FLINK-4493: - Summary: Unify the snapshot output format for keyed-state backends Key: FLINK-4493 URL: https://issues.apache.org/jira/browse/FLINK-4493 Project: Flink Issue Type: Improvement Reporter: Stefan Richter Priority: Minor We could unify the output format for keyed-state backends implementations, e.g. based on RocksDB and Heap, to write a single, common output format. For example, this would allow us to restore a state that was previously kept in RocksDB on a heap-located backend and vice versa. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4492) Cleanup files from canceled snapshots
Stefan Richter created FLINK-4492: - Summary: Cleanup files from canceled snapshots Key: FLINK-4492 URL: https://issues.apache.org/jira/browse/FLINK-4492 Project: Flink Issue Type: Bug Reporter: Stefan Richter Priority: Minor Current checkpointing only closes CheckpointStateOutputStreams on cancel, but incomplete files are not properly deleted from the filesystem. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4744) Introduce usercode class loader to deserialize partitionable operator state
Stefan Richter created FLINK-4744: - Summary: Introduce usercode class loader to deserialize partitionable operator state Key: FLINK-4744 URL: https://issues.apache.org/jira/browse/FLINK-4744 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Stefan Richter Assignee: Stefan Richter Deserialization of partitionable operator state does not work with user code. We need to introduce user ClassLoader to the OperatorStateBackend. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4731) HeapKeyedStateBackend restoring broken for scale-in
[ https://issues.apache.org/jira/browse/FLINK-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4731: -- Summary: HeapKeyedStateBackend restoring broken for scale-in (was: HeapKeyedStateBackend restorig broken for scale-in) > HeapKeyedStateBackend restoring broken for scale-in > --- > > Key: FLINK-4731 > URL: https://issues.apache.org/jira/browse/FLINK-4731 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > Restoring the HeapKeyedStateBackend is broken in case that parallelism is > reduced. The restore method is overwriting previously restored state. > We should also add scale-in testing to the RescalingITCase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4730) Introduce CheckpointMetaData
Stefan Richter created FLINK-4730: - Summary: Introduce CheckpointMetaData Key: FLINK-4730 URL: https://issues.apache.org/jira/browse/FLINK-4730 Project: Flink Issue Type: Bug Reporter: Stefan Richter Currently, the meta data for each checkpoint consists of up to 5 long values which are passed through several functions. When adding/removing meta data we would have to change function signatures in many places. Furthermore, this is prone to errors when the order of arguments is changed accidentally. We should introduce a CheckpointMetaData which encapsulates this checkpoint meta data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4731) HeapKeyedStateBackend restorig broken for scale-in
Stefan Richter created FLINK-4731: - Summary: HeapKeyedStateBackend restorig broken for scale-in Key: FLINK-4731 URL: https://issues.apache.org/jira/browse/FLINK-4731 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Stefan Richter Assignee: Stefan Richter Restoring the HeapKeyedStateBackend is broken in case that parallelism is reduced. The restore method is overwriting previously restored state. We should also add scale-in testing to the RescalingITCase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4730) Introduce CheckpointMetaData
[ https://issues.apache.org/jira/browse/FLINK-4730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4730: -- Component/s: State Backends, Checkpointing > Introduce CheckpointMetaData > > > Key: FLINK-4730 > URL: https://issues.apache.org/jira/browse/FLINK-4730 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter > > Currently, the meta data for each checkpoint consists of up to 5 long values > which are passed through several functions. When adding/removing meta data > we would have to change function signatures in many places. Furthermore, this > is prone to errors when the order of arguments is changed accidentally. We > should introduce a CheckpointMetaData which encapsulates this checkpoint meta > data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4730) Introduce CheckpointMetaData
[ https://issues.apache.org/jira/browse/FLINK-4730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-4730: - Assignee: Stefan Richter > Introduce CheckpointMetaData > > > Key: FLINK-4730 > URL: https://issues.apache.org/jira/browse/FLINK-4730 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > Currently, the meta data for each checkpoint consists of up to 5 long values > which are passed through several functions. When adding/removing meta data > we would have to change function signatures in many places. Furthermore, this > is prone to errors when the order of arguments is changed accidentally. We > should introduce a CheckpointMetaData which encapsulates this checkpoint meta > data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4603) KeyedStateBackend cannot restore user code classes
[ https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15512553#comment-15512553 ] Stefan Richter commented on FLINK-4603: --- Currently, we should still keep the UserCodeClassLoader around in the RocksDB backend because we still need to serialize the StateDescriptor, which contains the TypeSerializer, so that users can not accidentally create StateDescriptors with a wrong TypeSerializer. However, we should consider that TypeSerializer can be exchanged (ensuring their compatibility), e.g. to allow different serialization versions. > KeyedStateBackend cannot restore user code classes > -- > > Key: FLINK-4603 > URL: https://issues.apache.org/jira/browse/FLINK-4603 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.2.0 > > > A user reported that he cannot restore keyed state which contains user code > classes. I suspect that we don't use the user code class loader to > deserialize the state. > The solution seems to be to forward the user code class loader to the > {{KeyedStateBackends}} when restoring state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4556) Make Queryable State Key-Group Aware
[ https://issues.apache.org/jira/browse/FLINK-4556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-4556: - Assignee: Stefan Richter > Make Queryable State Key-Group Aware > > > Key: FLINK-4556 > URL: https://issues.apache.org/jira/browse/FLINK-4556 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > > The recent introduction of key-grouped state breaks queryable state because > the JobManager does not yet forward the client to the correct TaskManager > based on key-group ranges. > This will either have to be implemented on the JobManager side, i.e. in > {{AkkaKvStateLocationLookupService}} or on the {{TaskManager}} when state is > registered. The JobManager can know the mapping because it should know the > {{parallelism}}/{{maxParallelism}} which it can use to determine where the > state for a key-group is stored. The {{TaskManager}} send a > {{NotifyKvStateRegistered}} message that already contains a {{keyGroupIndex}} > field that is not useful/correct at the moment, though. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4508) Consolidate DummyEnvironment and MockEnvironment for Tests
Stefan Richter created FLINK-4508: - Summary: Consolidate DummyEnvironment and MockEnvironment for Tests Key: FLINK-4508 URL: https://issues.apache.org/jira/browse/FLINK-4508 Project: Flink Issue Type: Test Reporter: Stefan Richter Priority: Minor Currently we {{DummyEnvironment}} and {{MockEnvironment}} as implementations of Environments for our test. Both serve a similar purpose, but offer slightly different features. We should consolidate this by merging them into one class that offers the best of both previous implementations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4942) Improve processing performance of HeapInternalTimerService with key groups
Stefan Richter created FLINK-4942: - Summary: Improve processing performance of HeapInternalTimerService with key groups Key: FLINK-4942 URL: https://issues.apache.org/jira/browse/FLINK-4942 Project: Flink Issue Type: Improvement Reporter: Stefan Richter Currently, key groups awareness in `HeapInternalTimerService` is basically implemented as (hash) map of (hash) sets. Purpose of this is grouping key groups together in a way that allows easy serialization into key groups. However, this data layout comes along with some significant performance decrease, in particular when the number of key groups is high. I suggest to keep all timers in one set again at runtime, thus being as fast as in previous versions without key groups. Instead, we can perform a very fast online partitioning into key groups before a snapshot. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4942) Improve processing performance of HeapInternalTimerService with key groups
[ https://issues.apache.org/jira/browse/FLINK-4942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-4942: - Assignee: Stefan Richter > Improve processing performance of HeapInternalTimerService with key groups > -- > > Key: FLINK-4942 > URL: https://issues.apache.org/jira/browse/FLINK-4942 > Project: Flink > Issue Type: Improvement >Reporter: Stefan Richter >Assignee: Stefan Richter > > Currently, key groups awareness in `HeapInternalTimerService` is basically > implemented as (hash) map of (hash) sets. Purpose of this is grouping key > groups together in a way that allows easy serialization into key groups. > However, this data layout comes along with some significant performance > decrease, in particular when the number of key groups is high. > I suggest to keep all timers in one set again at runtime, thus being as fast > as in previous versions without key groups. > Instead, we can perform a very fast online partitioning into key groups > before a snapshot. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5019) Proper isRestored result for tasks that did not write state
[ https://issues.apache.org/jira/browse/FLINK-5019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-5019. - Resolution: Fixed Fix Version/s: 1.2.0 > Proper isRestored result for tasks that did not write state > --- > > Key: FLINK-5019 > URL: https://issues.apache.org/jira/browse/FLINK-5019 > Project: Flink > Issue Type: Bug >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.2.0 > > > When a subtask is restored from a checkpoint that does not contain any state > (e.g. because the subtask did not write state in the previous run), the > result of {{StateInitializationContext::isRestored}} will incorrectly return > false. > We should ensure that empty state is somehow reflected in a checkpoint and > return true on restore, independent from the presence of state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5036) Perform the grouping of keys in restoring instead of checkpointing
[ https://issues.apache.org/jira/browse/FLINK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650460#comment-15650460 ] Stefan Richter commented on FLINK-5036: --- >From the description it is not entirely clear if you are discussing a >theoretical or a concrete problem that you observed. If there is an observable >performance problem, could you please provide some backing measurements (log >outputs), the Flink version, and information about your keys and values to >help us figure out the cause and extend of the problem? If we are talking about a theoretical problem, the way you describe the effect of key-groups on snapshotting in the {{RocksDBKeyedStateBackend}} is not accurate. What was actually changed for key-groups in the current master is that each key in RocksDB is now prefixed by it's corresponding key-group ID (1-2 byte, depending on maxParallelism). This allows us to iterate all key-value pairs in key-grouped order at snapshot time, similar to how we previously iterated them in key-order. Furthermore, all key-groups go to the same file and are written consecutively without random IOs, and a small index of key-group-id -> offset is created. From this point of view, the performance impact of key-groups on snapshot time should (hopefully) be marginal. As another remark, non-partitioned state is snapshotted/restored in a similar way. Avoiding key-groups at snapshot times has also more problematic implications for restores than you consider in the description. For example, if the new parallelism is not a multiple of the old parallelism, we effectively have to read and filter the completed keyed state for each task that works on it. > Perform the grouping of keys in restoring instead of checkpointing > -- > > Key: FLINK-5036 > URL: https://issues.apache.org/jira/browse/FLINK-5036 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Whenever taking snapshots of {{RocksDBKeyedStateBackend}}, the values in the > states will be written onto different files according to their key groups. > The procedure is very costly when the states are very big. > Given that the snapshot operations will be performed much more frequently > than restoring, we can leave the key groups as they are to improve the > overall performance. In other words, we can perform the grouping of keys in > restoring instead of in checkpointing. > I think, the implementation will be very similar to the restoring of > non-partitioned states. Each task will receive a collection of snapshots each > of which contains a set of key groups. Each task will restore its states from > the given snapshots by picking values in assigned key groups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5037) Instability in AbstractUdfStreamOperatorLifecycleTest
[ https://issues.apache.org/jira/browse/FLINK-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-5037. - Resolution: Fixed Fix Version/s: 1.2.0 > Instability in AbstractUdfStreamOperatorLifecycleTest > - > > Key: FLINK-5037 > URL: https://issues.apache.org/jira/browse/FLINK-5037 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Stefan Richter > Fix For: 1.2.0 > > > I saw this failure: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/174340963/log.txt > I suspect it has to do with the {{Thread.sleep()}} in Line 237. I think it > can be replaced by {{runStarted.await()}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5041) Implement savepoint backwards compatibility 1.1 -> 1.2
Stefan Richter created FLINK-5041: - Summary: Implement savepoint backwards compatibility 1.1 -> 1.2 Key: FLINK-5041 URL: https://issues.apache.org/jira/browse/FLINK-5041 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Stefan Richter Assignee: Stefan Richter This issue tracks the implementation of backwards compatibility between Flink 1.1 and 1.2 releases. This task subsumes: - Converting old savepoints to new savepoints, including a conversion of state handles to their new replacement. - Converting keyed state from old backend implementations to their new counterparts. - Converting operator and function state for all changed operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5043) Converting keyed state from Flink 1.1 backend implementations to their new counterparts in 1.2
Stefan Richter created FLINK-5043: - Summary: Converting keyed state from Flink 1.1 backend implementations to their new counterparts in 1.2 Key: FLINK-5043 URL: https://issues.apache.org/jira/browse/FLINK-5043 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Stefan Richter Assignee: Stefan Richter Keyed state backends became keygroup aware in Flink 1.2 and their hierarchy as a whole changed significantly. We need to implement a conversion so that old snapshots can be restored into new backends. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5042) Convert old savepoints to new savepoints
Stefan Richter created FLINK-5042: - Summary: Convert old savepoints to new savepoints Key: FLINK-5042 URL: https://issues.apache.org/jira/browse/FLINK-5042 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Stefan Richter Assignee: Stefan Richter The format of savepoints and the hierarchy of state handles changed a lot between Flink 1.1 and 1.2. For backwards compatibility, we need to convert old to new savepoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5044) Converting operator and function state from Flink 1.1 for all changed operators in 1.2
Stefan Richter created FLINK-5044: - Summary: Converting operator and function state from Flink 1.1 for all changed operators in 1.2 Key: FLINK-5044 URL: https://issues.apache.org/jira/browse/FLINK-5044 Project: Flink Issue Type: Sub-task Components: Streaming, Windowing Operators Affects Versions: 1.2.0 Reporter: Stefan Richter Snapshot/restore mechanics for operators changed significantly between Flink 1.1 and Flink 1.2. Furthermore, operators and their hierarchy also changed. We need to ensure that old operators can still restore state from their old version. In particular, WindowOperator and KafkaConsumer are currently affected by this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5051) Backwards compatibility for serializers in backend state
Stefan Richter created FLINK-5051: - Summary: Backwards compatibility for serializers in backend state Key: FLINK-5051 URL: https://issues.apache.org/jira/browse/FLINK-5051 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Stefan Richter When a new state is register, e.g. in a keyed backend via `getPartitionedState`, the caller has to provide all type serializers required for the persistence of state components. Explicitly passing the serializers on state creation already allows for potentiall version upgrades of serializers. However, those serializers are currently not part of any snapshot and are only provided at runtime, when the state is registered newly or restored. For backwards compatibility, this has strong implications: checkpoints are not self contained in that state is currently a blackbox without knowledge about it's corresponding serializers. Most cases where we would need to restructure the state are basically lost. We could only convert them lazily at runtime and only once the user is registering the concrete state, which might happen at unpredictable points. I suggest to adapt our solution as follows: - As now, all states are registered with their set of serializers. - Unlike now, all serializers are written to the snapshot. This makes savepoints self-contained and also allows to create inspection tools for savepoints at some point in the future. - Introduce an interface {{Versioned}} with {{long getVersion()}} and {{boolean isCompatible(Versioned v)}} which is then implemented by serializers. Compatible serializers must ensure that they can deserialize older versions, and can then serialize them in their new format. This is how we upgrade. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5052) Changing the maximum parallelism (number of key groups) of a job
Stefan Richter created FLINK-5052: - Summary: Changing the maximum parallelism (number of key groups) of a job Key: FLINK-5052 URL: https://issues.apache.org/jira/browse/FLINK-5052 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Stefan Richter Through dynamic rescaling, Flink jobs can already adjust their parallelism and each operator only has to read it's assigned key-groups. However, the maximum parallelism is determined by the number of key-groups (aka maxParallelism), which is currently fixed forever after the job is first started. We could consider to relax this limitations, so that users can modify the number of key-groups after the fact, which is useful in particular for upscaling jobs from older Flink versions (<1.2) which must be converted with maxparallelism == parallelism. In the general case, changing the maxParallelism can lead to shuffling of keys between key-groups, which means that a change in the number of key-groups can shuffle keys between key-groups and we would have to read the complete state on each operator instance, filtering for those keys that actually fall into the key-groups assigned to the operator instances. While it is certainly possible to support this, it is obviously a very expensive operation. Fortunately, the assignment of keys to operators is currently determined as follows: {{operatorInstance = computeKeyGroup(key) * parallelism / maxParallelism}} This means that we can provide more efficient support for upscaling of maxParallelism, if {{newMaxParallelism == n * oldMaxParallelism}}. In this case, keys are not reshuffled between key-groups, but key-groups are split by a factor n instead. This only focus on some old key-groups when restoring operator instances for new maxParallelism and significantly reduces the amount of unnecessary data transfer, e.g. ~ 1/2 for increasing maxParallelism by a factor 2, ~2/3 when increasing by a factor 3, etc. Implementing this feature would require the following steps: - Introduce/modify state handles with the capability to summarize multiple logical keygroups into one mixed physical entity. - Enhance StateAssignmentOperation so that it can deal with and correctly assign the new/modified keyed state handles to subtasks on restoring a checkpoint. We also need to implement how to compute the correct super-key-group, but this is rather simple. - Filtering out key clippings on restoring in the backends. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5053) Incremental / lightweight snapshots for checkpoints
Stefan Richter created FLINK-5053: - Summary: Incremental / lightweight snapshots for checkpoints Key: FLINK-5053 URL: https://issues.apache.org/jira/browse/FLINK-5053 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Stefan Richter There is currently basically no difference between savepoints and checkpoints in Flink and both are created through exactly the same process. However, savepoints and checkpoints have a slightly different meaning which we should take into account to keep Flink efficient: - Savepoints are (typically infrequently) triggered by the user to create a state from which the application can be restarted, e.g. because Flink, some code, or the parallelism needs to be changed. - Checkpoints are (typically frequently) triggered by the System to allow for fast recovery in case of failure, but keeping the job/system unchanged. This means that savepoints and checkpoints can have different properties in that: - Savepoint should represent a state of the application, where characteristics of the job (e.g. parallelism) can be adjusted for the next restart. One example for things that savepoints need to be aware of are key-groups. Savepoints can potentially be a little more expensive than checkpoints, because they are usually created a lot less frequently through the user. - Checkpoints are frequently triggered by the system to allow for fast failure recovery. However, failure recovery leaves all characteristics of the job unchanged. This checkpoints do not have to be aware of those, e.g. think again of key groups. Checkpoints should run faster than creating savepoints, in particular it would be nice to have incremental checkpoints. For a first approach, I would suggest the following steps/changes: - In checkpoint coordination: differentiate between triggering checkpoints and savepoints. Introduce properties for checkpoints that describe their set of abilities, e.g. "is-key-group-aware", "is-incremental". - In state handle infrastructure: introduce state handles that reflect incremental checkpoints and drop full key-group awareness, i.e. covering folders instead of files and not having keygroup_id -> file/offset mapping, but keygroup_range -> folder? - Backend side: We should start with RocksDB by reintroducing something similar to semi-async snapshots, but using BackupableDBOptions::setShareTableFiles(true) and transferring only new incremental outputs to HDFS. Notice that using RocksDB's internal backup mechanism is giving up on the information about individual key-groups. But as explained above, this should be totally acceptable for checkpoints, while savepoints should use the key-group-aware fully async mode. Of course we also need to implement the ability to restore from both types of snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4858) Remove Legacy Checkpointing Interfaces
[ https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4858: -- Description: This issue tracks the removal of the deprecated/legacy interfaces that are still in the code and can only be removed once the move to the new Flink 1.2 checkpointing is completely in place. So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot methods on the testing harnesses. Furthermore, codepaths and classes touching legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can be cleaned up. was: This issue tracks the removal of the deprecated/legacy interfaces that are still in the code and can only be removed once the move to the new Flink 1.2 checkpointing is completely in place. So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot methods on the testing harnesses. > Remove Legacy Checkpointing Interfaces > -- > > Key: FLINK-4858 > URL: https://issues.apache.org/jira/browse/FLINK-4858 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.2.0 > > > This issue tracks the removal of the deprecated/legacy interfaces that are > still in the code and can only be removed once the move to the new Flink 1.2 > checkpointing is completely in place. > So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot > methods on the testing harnesses. Furthermore, codepaths and classes touching > legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can > be cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4492) Cleanup files from canceled snapshots
[ https://issues.apache.org/jira/browse/FLINK-4492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575225#comment-15575225 ] Stefan Richter commented on FLINK-4492: --- I did not add any cleanup on cancel in the changes I did, but somebody might have. > Cleanup files from canceled snapshots > - > > Key: FLINK-4492 > URL: https://issues.apache.org/jira/browse/FLINK-4492 > Project: Flink > Issue Type: Bug >Reporter: Stefan Richter >Assignee: Nikolay Vasilishin >Priority: Minor > > Current checkpointing only closes CheckpointStateOutputStreams on cancel, but > incomplete files are not properly deleted from the filesystem. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592292#comment-15592292 ] Stefan Richter commented on FLINK-4867: --- If you sort performance is crucial to you, I wrote some inplace radix sort algorithm that was extremely fast for me in a precious project. On primitives types and serialized byte strings I found it typically factor 2-3x faster than JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was considering to port it onto Flink, but did not find the time yet. As it is radix based and not comparison based, it would require some way to expose partial sort keys instead of a compareTo method . If that is interesting to you let me know and I can share the original code. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592292#comment-15592292 ] Stefan Richter edited comment on FLINK-4867 at 10/20/16 4:32 PM: - If sort performance is crucial to you, I wrote some inplace radix sort algorithm that was extremely fast for me in a precious project. On primitives types and serialized byte strings I found it typically factor 2-3x faster than JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was considering to port it onto Flink, but did not find the time yet. As it is radix based and not comparison based, it would require some way to expose partial sort keys instead of a compareTo method . If that is interesting to you let me know and I can share the original code. was (Author: srichter): If you sort performance is crucial to you, I wrote some inplace radix sort algorithm that was extremely fast for me in a precious project. On primitives types and serialized byte strings I found it typically factor 2-3x faster than JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was considering to port it onto Flink, but did not find the time yet. As it is radix based and not comparison based, it would require some way to expose partial sort keys instead of a compareTo method . If that is interesting to you let me know and I can share the original code. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects
[ https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15601278#comment-15601278 ] Stefan Richter commented on FLINK-4883: --- Ok, thanks! I will assign you to this. > Prevent UDFs implementations through Scala singleton objects > > > Key: FLINK-4883 > URL: https://issues.apache.org/jira/browse/FLINK-4883 > Project: Flink > Issue Type: Bug >Reporter: Stefan Richter > > Currently, user can create and use UDFs in Scala like this: > {code} > object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] > { > ... > } > {code} > However, this leads to problems as the UDF is now a singleton that Flink > could use across several operator instances, which leads to job failures. We > should detect and prevent the usage of singleton UDFs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4843) Introduce Test for FsCheckpointStateOutputStream::getPos
Stefan Richter created FLINK-4843: - Summary: Introduce Test for FsCheckpointStateOutputStream::getPos Key: FLINK-4843 URL: https://issues.apache.org/jira/browse/FLINK-4843 Project: Flink Issue Type: Test Reporter: Stefan Richter Assignee: Stefan Richter Introduce a test for FsCheckpointStateOutputStream::getPos, which is currently not included in the tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4842) Introduce test to enforce order of operator / udf lifecycles
Stefan Richter created FLINK-4842: - Summary: Introduce test to enforce order of operator / udf lifecycles Key: FLINK-4842 URL: https://issues.apache.org/jira/browse/FLINK-4842 Project: Flink Issue Type: Test Reporter: Stefan Richter Assignee: Stefan Richter We should introduce a test that enforces a certain order in which life cycle methods of operators and udfs are called, so that they are not easily changed by accident. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects
[ https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15604566#comment-15604566 ] Stefan Richter commented on FLINK-4883: --- Sure, here is the example from the mailing list: {code} import org.apache.flink.api.scala._ import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.util.Collector import scala.collection.mutable.ArrayBuffer object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] { val buffer = ArrayBuffer.empty[Long] @transient var state: ValueState[String] = _ override def open(parameters: Configuration): Unit = { super.open(parameters) state = getRuntimeContext.getState(new ValueStateDescriptor[String]("state-descriptor", classOf[String], "")) } override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = { state.update(value) } override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = { buffer += value if (state.value() != "") { for (elem โ buffer) { out.collect((elem, state.value())) } buffer.clear() } } } object StreamingPipeline { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(new MemoryStateBackend()) val pipeline1 = env.generateSequence(0, 1000) val pipeline2 = env.fromElements("even", "odd") pipeline1.connect(pipeline2) .keyBy( elem โ elem % 2 == 0, elem โ elem == "even" ).flatMap(FlatMapper) .print() env.execute("Example") } } {code} > Prevent UDFs implementations through Scala singleton objects > > > Key: FLINK-4883 > URL: https://issues.apache.org/jira/browse/FLINK-4883 > Project: Flink > Issue Type: Bug >Reporter: Stefan Richter >Assignee: Renkai Ge > > Currently, user can create and use UDFs in Scala like this: > {code} > object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] > { > ... > } > {code} > However, this leads to problems as the UDF is now a singleton that Flink > could use across several operator instances, which leads to job failures. We > should detect and prevent the usage of singleton UDFs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4910) Introduce safety net for closing file system streams
[ https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-4910: - Assignee: Stefan Richter > Introduce safety net for closing file system streams > > > Key: FLINK-4910 > URL: https://issues.apache.org/jira/browse/FLINK-4910 > Project: Flink > Issue Type: Improvement >Reporter: Stefan Richter >Assignee: Stefan Richter > > Streams that are opened through {{FileSystem}} must be closed at the end of > their life cycle. However, we found hints that some code forgets to close > such streams. > We should introduce i) a mechanism that closes leaking unclosed streams after > usage and ii) provides logging that helps us to track down and fi the sources > of such leaks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4910) Introduce safety net for closing file system streams
Stefan Richter created FLINK-4910: - Summary: Introduce safety net for closing file system streams Key: FLINK-4910 URL: https://issues.apache.org/jira/browse/FLINK-4910 Project: Flink Issue Type: Improvement Reporter: Stefan Richter Streams that are opened through {{FileSystem}} must be closed at the end of their life cycle. However, we found hints that some code forgets to close such streams. We should introduce i) a mechanism that closes leaking unclosed streams after usage and ii) provides logging that helps us to track down and fi the sources of such leaks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects
[ https://issues.apache.org/jira/browse/FLINK-4883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4883: -- Assignee: Renkai Ge > Prevent UDFs implementations through Scala singleton objects > > > Key: FLINK-4883 > URL: https://issues.apache.org/jira/browse/FLINK-4883 > Project: Flink > Issue Type: Bug >Reporter: Stefan Richter >Assignee: Renkai Ge > > Currently, user can create and use UDFs in Scala like this: > {code} > object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] > { > ... > } > {code} > However, this leads to problems as the UDF is now a singleton that Flink > could use across several operator instances, which leads to job failures. We > should detect and prevent the usage of singleton UDFs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance
[ https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592866#comment-15592866 ] Stefan Richter commented on FLINK-4867: --- I have already pushed my code into https://github.com/StefanRRichter/RadixSort so that you can take a look. I will do some cleanup, more documentation, and tests if I find some more time to polish this. If you have questions about the code, just drop me an email. > Investigate code generation for improving sort performance > -- > > Key: FLINK-4867 > URL: https://issues.apache.org/jira/browse/FLINK-4867 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > Labels: performance > > This issue is for investigating whether code generation could speed up > sorting. We should make some performance measurements on hand-written code > that is similar to what we could generate, to see whether investing more time > into this is worth it. If we find that it is worth it, we can open a second > Jira for the actual implementation of the code generation. > I think we could generate one class at places where we currently instantiate > {{QuickSort}}. This generated class would include the functionality of > {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, > {{MemorySegment.compare}}, and {{MemorySegment.swap}}. > Btw. I'm planning to give this as a student project at a TU Berlin course in > the next few months. > Some concrete ideas about how could a generated sorter be faster than the > current sorting code: > * {{MemorySegment.compare}} could be specialized for > ** Length: for small records, the loop could be unrolled > ** Endiannes (currently it is optimized for big endian; and in the little > endian case (e.g. x86) it does a Long.reverseBytes for each long read) > * {{MemorySegment.swapBytes}} > ** In case of small records, using three {{UNSAFE.copyMemory}} is probably > not as efficient as a specialized swap, because > *** We could use total loop unrolling in generated code (because we know the > exact record size) > *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\] > *** We will only need 2/3 the memory bandwidth, because the temporary storage > could be a register if we swap one byte (or one {{long}}) at a time > ** several checks might be eliminated > * Better inlining behaviour could be achieved > ** Virtual function calls to the methods of {{InMemorySorter}} could be > eliminated. (Note, that these are problematic to devirtualize by the JVM if > there are different derived classes used in a single Flink job (see \[8,7\]).) > ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the > excessive checks make it too large > ** {{MemorySegment.compare}} is probably also not inlined currently, because > those two while loops are too large > \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, > long, Object, long, long) > \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/ > \[8\] > http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/ > \[9\] > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4842) Introduce test to enforce order of operator / udf lifecycles
[ https://issues.apache.org/jira/browse/FLINK-4842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-4842. - Resolution: Implemented Fix Version/s: 1.2.0 > Introduce test to enforce order of operator / udf lifecycles > - > > Key: FLINK-4842 > URL: https://issues.apache.org/jira/browse/FLINK-4842 > Project: Flink > Issue Type: Test >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.2.0 > > > We should introduce a test that enforces a certain order in which life cycle > methods of operators and udfs are called, so that they are not easily changed > by accident. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-4844. - Resolution: Implemented Fix Version/s: 1.2.0 > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.2.0 > > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4883) Prevent UDFs implementations through Scala singleton objects
Stefan Richter created FLINK-4883: - Summary: Prevent UDFs implementations through Scala singleton objects Key: FLINK-4883 URL: https://issues.apache.org/jira/browse/FLINK-4883 Project: Flink Issue Type: Bug Reporter: Stefan Richter Currently, user can create and use UDFs in Scala like this: {code} object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] { ... } {code} However, this leads to problems as the UDF is now a singleton that Flink could use across several operator instances, which leads to job failures. We should detect and prevent the usage of singleton UDFs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5053) Incremental / lightweight snapshots for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15663198#comment-15663198 ] Stefan Richter commented on FLINK-5053: --- That might very well be, I am still planning to take closer look at RocksDB's backup/checkpoint features anyways before I start working on this. Until now, this description is more like a rough outline of my planning and for discussion. But thanks for the hint! > Incremental / lightweight snapshots for checkpoints > --- > > Key: FLINK-5053 > URL: https://issues.apache.org/jira/browse/FLINK-5053 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter > > There is currently basically no difference between savepoints and checkpoints > in Flink and both are created through exactly the same process. > However, savepoints and checkpoints have a slightly different meaning which > we should take into account to keep Flink efficient: > - Savepoints are (typically infrequently) triggered by the user to create a > state from which the application can be restarted, e.g. because Flink, some > code, or the parallelism needs to be changed. > - Checkpoints are (typically frequently) triggered by the System to allow for > fast recovery in case of failure, but keeping the job/system unchanged. > This means that savepoints and checkpoints can have different properties in > that: > - Savepoint should represent a state of the application, where > characteristics of the job (e.g. parallelism) can be adjusted for the next > restart. One example for things that savepoints need to be aware of are > key-groups. Savepoints can potentially be a little more expensive than > checkpoints, because they are usually created a lot less frequently through > the user. > - Checkpoints are frequently triggered by the system to allow for fast > failure recovery. However, failure recovery leaves all characteristics of the > job unchanged. This checkpoints do not have to be aware of those, e.g. think > again of key groups. Checkpoints should run faster than creating savepoints, > in particular it would be nice to have incremental checkpoints. > For a first approach, I would suggest the following steps/changes: > - In checkpoint coordination: differentiate between triggering checkpoints > and savepoints. Introduce properties for checkpoints that describe their set > of abilities, e.g. "is-key-group-aware", "is-incremental". > - In state handle infrastructure: introduce state handles that reflect > incremental checkpoints and drop full key-group awareness, i.e. covering > folders instead of files and not having keygroup_id -> file/offset mapping, > but keygroup_range -> folder? > - Backend side: We should start with RocksDB by reintroducing something > similar to semi-async snapshots, but using > BackupableDBOptions::setShareTableFiles(true) and transferring only new > incremental outputs to HDFS. Notice that using RocksDB's internal backup > mechanism is giving up on the information about individual key-groups. But as > explained above, this should be totally acceptable for checkpoints, while > savepoints should use the key-group-aware fully async mode. Of course we also > need to implement the ability to restore from both types of snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5146) Improved resource cleanup in RocksDB keyed state backend
[ https://issues.apache.org/jira/browse/FLINK-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-5146: -- Priority: Blocker (was: Major) > Improved resource cleanup in RocksDB keyed state backend > > > Key: FLINK-5146 > URL: https://issues.apache.org/jira/browse/FLINK-5146 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > > Currently, the resources such as taken snapshots or iterators are not always > cleaned up in the RocksDB state backend. In particular, not starting the > runnable future will leave taken snapshots unreleased. > We should improve the releases of all resources allocated through the RocksDB > JNI bridge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5146) Improved resource cleanup in RocksDB keyed state backend
Stefan Richter created FLINK-5146: - Summary: Improved resource cleanup in RocksDB keyed state backend Key: FLINK-5146 URL: https://issues.apache.org/jira/browse/FLINK-5146 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Stefan Richter Currently, the resources such as taken snapshots or iterators are not always cleaned up in the RocksDB state backend. In particular, not starting the runnable future will leave taken snapshots unreleased. We should improve the releases of all resources allocated through the RocksDB JNI bridge. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5024) Add SimpleStateDescriptor to clarify the concepts
[ https://issues.apache.org/jira/browse/FLINK-5024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692874#comment-15692874 ] Stefan Richter commented on FLINK-5024: --- Independently of this, I also talked with Aljoscha about state descriptors. What I don't like in the current implementation is that it contains the state name and value type serializer, but for example not the namespace type serializer. This is also a little inconsistent, so +1 for some changes here. > Add SimpleStateDescriptor to clarify the concepts > - > > Key: FLINK-5024 > URL: https://issues.apache.org/jira/browse/FLINK-5024 > Project: Flink > Issue Type: Improvement >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, StateDescriptors accept two type arguments : the first one is the > type of the created state and the second one is the type of the values in the > states. > The concepts however is a little confusing here because in ListStates, the > arguments passed to the StateDescriptors are the types of the list elements > instead of the lists. It also makes the implementation of MapStates difficult. > I suggest not to put the type serializer in StateDescriptors, making > StateDescriptors independent of the data structures of the values. > A new type of StateDescriptor named SimpleStateDescriptor can be provided to > abstract those states (namely ValueState, ReducingState and FoldingState) > whose states are not composited. > The states (e.g. ListStates and MapStates) can implement their own > descriptors according to their data structures. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4100) RocksDBStateBackend#close() can throw NPE
[ https://issues.apache.org/jira/browse/FLINK-4100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15701566#comment-15701566 ] Stefan Richter commented on FLINK-4100: --- Yes, this looks very outdated. > RocksDBStateBackend#close() can throw NPE > - > > Key: FLINK-4100 > URL: https://issues.apache.org/jira/browse/FLINK-4100 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Priority: Trivial > > When running the RocksDBStateBackendTest on Windows i ran into an NPE. The > tests are aborted in the @Before checkOperatingSystem method (which is > correct behaviour), but the test still calls dispose() in @After teardown(). > This lead to an NPE since the lock object used is null; it was not > initialized since initializeForJob() was never called and there is no null > check. > {code} > testCopyDefaultValue(org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest) > Time elapsed: 0 sec <<< ERROR! > java.lang.NullPointerException: null > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.dispose(RocksDBStateBackend.java:318) > at > org.apache.flink.runtime.state.StateBackendTestBase.teardown(StateBackendTestBase.java:71) > at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5053) Incremental / lightweight snapshots for checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-5053: -- Description: There is currently basically no difference between savepoints and checkpoints in Flink and both are created through exactly the same process. However, savepoints and checkpoints have a slightly different meaning which we should take into account to keep Flink efficient: - Savepoints are (typically infrequently) triggered by the user to create a state from which the application can be restarted, e.g. because Flink, some code, or the parallelism needs to be changed. - Checkpoints are (typically frequently) triggered by the System to allow for fast recovery in case of failure, but keeping the job/system unchanged. This means that savepoints and checkpoints can have different properties in that: - Savepoint should represent a state of the application, where characteristics of the job (e.g. parallelism) can be adjusted for the next restart. One example for things that savepoints need to be aware of are key-groups. Savepoints can potentially be a little more expensive than checkpoints, because they are usually created a lot less frequently through the user. - Checkpoints are frequently triggered by the system to allow for fast failure recovery. However, failure recovery leaves all characteristics of the job unchanged. This checkpoints do not have to be aware of those, e.g. think again of key groups. Checkpoints should run faster than creating savepoints, in particular it would be nice to have incremental checkpoints. For a first approach, I would suggest the following steps/changes: - In checkpoint coordination: differentiate between triggering checkpoints and savepoints. Introduce properties for checkpoints that describe their set of abilities, e.g. "is-key-group-aware", "is-incremental". - In state handle infrastructure: introduce state handles that reflect incremental checkpoints and drop full key-group awareness, i.e. covering folders instead of files and not having keygroup_id -> file/offset mapping, but keygroup_range -> folder? - Backend side: We should start with RocksDB by reintroducing something similar to semi-async snapshots, but using BackupableDBOptions::setShareTableFiles(true) and transferring only new incremental outputs to HDFS. Notice that using RocksDB's internal backup mechanism is giving up on the information about individual key-groups. But as explained above, this should be totally acceptable for checkpoints, while savepoints should use the key-group-aware fully async mode. Of course we also need to implement the ability to restore from both types of snapshots. One problem in the suggested approach is still that even checkpoints should support scale-down, in case that only a smaller number of instances is left available in a recovery case. was: There is currently basically no difference between savepoints and checkpoints in Flink and both are created through exactly the same process. However, savepoints and checkpoints have a slightly different meaning which we should take into account to keep Flink efficient: - Savepoints are (typically infrequently) triggered by the user to create a state from which the application can be restarted, e.g. because Flink, some code, or the parallelism needs to be changed. - Checkpoints are (typically frequently) triggered by the System to allow for fast recovery in case of failure, but keeping the job/system unchanged. This means that savepoints and checkpoints can have different properties in that: - Savepoint should represent a state of the application, where characteristics of the job (e.g. parallelism) can be adjusted for the next restart. One example for things that savepoints need to be aware of are key-groups. Savepoints can potentially be a little more expensive than checkpoints, because they are usually created a lot less frequently through the user. - Checkpoints are frequently triggered by the system to allow for fast failure recovery. However, failure recovery leaves all characteristics of the job unchanged. This checkpoints do not have to be aware of those, e.g. think again of key groups. Checkpoints should run faster than creating savepoints, in particular it would be nice to have incremental checkpoints. For a first approach, I would suggest the following steps/changes: - In checkpoint coordination: differentiate between triggering checkpoints and savepoints. Introduce properties for checkpoints that describe their set of abilities, e.g. "is-key-group-aware", "is-incremental". - In state handle infrastructure: introduce state handles that reflect incremental checkpoints and drop full key-group awareness, i.e. covering folders instead of files and not having keygroup_id -> file/offset mapping, but keygroup_range -> folder? - Backend side: We should start
[jira] [Created] (FLINK-5099) Test testCancelPartitionRequest is instable
Stefan Richter created FLINK-5099: - Summary: Test testCancelPartitionRequest is instable Key: FLINK-5099 URL: https://issues.apache.org/jira/browse/FLINK-5099 Project: Flink Issue Type: Bug Components: Network Reporter: Stefan Richter I observed this test fail on Travis (very rarely): testCancelPartitionRequest(org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest) Time elapsed: 168.756 sec <<< ERROR! java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) at java.lang.StringBuilder.append(StringBuilder.java:132) at org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.testCancelPartitionRequest(CancelPartitionRequestTest.java:94) Results : Tests in error: CancelPartitionRequestTest.testCancelPartitionRequest:94 ยป OutOfMemory Java he... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5100) Test testZooKeeperReelection is instable
Stefan Richter created FLINK-5100: - Summary: Test testZooKeeperReelection is instable Key: FLINK-5100 URL: https://issues.apache.org/jira/browse/FLINK-5100 Project: Flink Issue Type: Bug Components: Distributed Coordination Reporter: Stefan Richter I observed this test failing (very rarely) on Travis: testZooKeeperReelection(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest) Time elapsed: 303.321 sec <<< FAILURE! java.lang.AssertionError: null at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.junit.Assert.assertFalse(Assert.java:74) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelection(ZooKeeperLeaderElectionTest.java:197) Results : Failed tests: ZooKeeperLeaderElectionTest.testZooKeeperReelection:197 null -- This message was sent by Atlassian JIRA (v6.3.4#6332)