[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references
[ https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931036#comment-15931036 ] ASF GitHub Bot commented on FLINK-6097: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3560#discussion_r106772134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -227,18 +227,23 @@ object ProjectionTranslator { * @param exprs a list of expressions to extract * @return a list of field references extracted from the given expressions */ - def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] = { -exprs.foldLeft(Set[NamedExpression]()) { + def extractFieldReferences(exprs: Seq[Expression]): List[NamedExpression] = { +exprs.foldLeft(List[NamedExpression]()) { (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences) -}.toSeq +} } private def identifyFieldReferences( expr: Expression, - fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr match { + fieldReferences: List[NamedExpression]): List[NamedExpression] = expr match { --- End diff -- The order is really depending on how we extract fields from all kinds of expressions. Like `BinaryExpression`, we first extract `left child`, and then `right child`. And for `Funtion Calls`, we extract the field from parameter with left to right order. More complex example will be `over`, imagine there is an aggregate on a partitioned window. Should the fields appeared in the aggregate or the field which partitioned on should be considered first? So i think this kind of order is hard to define and hard to stay consistency, it will change easily when we modifying the codes. We should not rely anything on this. > Guaranteed the order of the extracted field references > -- > > Key: FLINK-6097 > URL: https://issues.apache.org/jira/browse/FLINK-6097 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > When we try to implement `OVER window` TableAPI, The first version of the > prototype to achieve,we do not consider the table field will be out of order > when we implement `translateToPlan` method,then we set `outputRow` field > from `inputRow` according to the Initial order of the table field index. > At the beginning, the projections in the select statement less than 5 columns > It works well.But Unfortunately when the count of projections bigger than 4 > (>=5), we got the random result. Then we debug the code, we find that > `ProjectionTranslator # identifyFieldReferences` method uses the` Set` > temporary save field, when the number of elements in the Set is less than 5, > the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of > elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet > and which will cause the data to be out of order. > e.g.: > Add the following elements in turn: > {code} > A, b, c, d, e > Set (a) > Class scala.collection.immutable.Set $ Set1 > Set (a, b) > Class scala.collection.immutable.Set $ Set2 > Set (a, b, c) > Class scala.collection.immutable.Set $ Set3 > Set (a, b, c, d) > Class scala.collection.immutable.Set $ Set4 > // we want (a, b, c, d, e) > Set (e, a, b, c, d) > Class scala.collection.immutable.HashSet $ HashTrieSet > {code} > So we thought 2 approach to solve this problem: > 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the > order of the extracted field references same as input order. > 2. We add the input and output field mapping. > At last we using approach#2 solve the problem. This change is not necessary > for the problem i have faced. But I feel it is better to let the output of > this method in the same order as the input, it may be very helpful for other > cases, though I am currently not aware of any. I am ok with not making this > change, but we should add a comment instead to highlight that the potential > output of the current output. Otherwise, some people may not pay attention to > this and assume it is in order. > Hi, guys, What do you think? Welcome any feedback. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3560: [FLINK-6097][table] Guaranteed the order of the ex...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3560#discussion_r106772134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -227,18 +227,23 @@ object ProjectionTranslator { * @param exprs a list of expressions to extract * @return a list of field references extracted from the given expressions */ - def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] = { -exprs.foldLeft(Set[NamedExpression]()) { + def extractFieldReferences(exprs: Seq[Expression]): List[NamedExpression] = { +exprs.foldLeft(List[NamedExpression]()) { (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences) -}.toSeq +} } private def identifyFieldReferences( expr: Expression, - fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr match { + fieldReferences: List[NamedExpression]): List[NamedExpression] = expr match { --- End diff -- The order is really depending on how we extract fields from all kinds of expressions. Like `BinaryExpression`, we first extract `left child`, and then `right child`. And for `Funtion Calls`, we extract the field from parameter with left to right order. More complex example will be `over`, imagine there is an aggregate on a partitioned window. Should the fields appeared in the aggregate or the field which partitioned on should be considered first? So i think this kind of order is hard to define and hard to stay consistency, it will change easily when we modifying the codes. We should not rely anything on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6106) Blob Server doesn't delete tmp fold when exit in HA mode.
Syinchwun Leo created FLINK-6106: Summary: Blob Server doesn't delete tmp fold when exit in HA mode. Key: FLINK-6106 URL: https://issues.apache.org/jira/browse/FLINK-6106 Project: Flink Issue Type: Wish Components: JobManager Affects Versions: 1.2.0 Reporter: Syinchwun Leo When start in HA mode, Blob server does not put itself in shutdown hook. Line 158-164 if (highAvailabilityMode == HighAvailabilityMode.NONE) { // Add shutdown hook to delete storage directory this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); } else { this.shutdownHook = null; } That means, when kill the application in YARN, tmp fold will not be deleted. What the propose of this design? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly
[ https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931016#comment-15931016 ] ASF GitHub Bot commented on FLINK-6020: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 Right...I have same thought as you at the beginning and i've tried to make the move atomic but it has serveral side affect, like: 1. if we use this way to handle this, which means two job can share the same jar file in blobserver, it will be a problem when one of them being canceled and deleting its jars(now it seems like it doesn't do the delete, but it should do) 2. for job recovery(or other kind of recovery, i'm not sure, just observed the phenomenon) blob server will upload jars to hdfs using same name of local file. Even the two jobs share same jar in blob store, they will upload it twice at same time, which will cause file lease occuptation in hdfs. > Blob Server cannot hanlde multiple job sumits(with same content) parallelly > --- > > Key: FLINK-6020 > URL: https://issues.apache.org/jira/browse/FLINK-6020 > Project: Flink > Issue Type: Bug >Reporter: Tao Wang >Assignee: Tao Wang >Priority: Critical > > In yarn-cluster mode, if we submit one same job multiple times parallelly, > the task will encounter class load problem and lease occuputation. > Because blob server stores user jars in name with generated sha1sum of those, > first writes a temp file and move it to finalialize. For recovery it also > will put them to HDFS with same file name. > In same time, when multiple clients sumit same job with same jar, the local > jar files in blob server and those file on hdfs will be handled in multiple > threads(BlobServerConnection), and impact each other. > It's better to have a way to handle this, now two ideas comes up to my head: > 1. lock the write operation, or > 2. use some unique identifier as file name instead of ( or added up to) > sha1sum of the file contents. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3525: [FLINK-6020]add a random integer suffix to blob key to av...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 Right...I have same thought as you at the beginning and i've tried to make the move atomic but it has serveral side affect, like: 1. if we use this way to handle this, which means two job can share the same jar file in blobserver, it will be a problem when one of them being canceled and deleting its jars(now it seems like it doesn't do the delete, but it should do) 2. for job recovery(or other kind of recovery, i'm not sure, just observed the phenomenon) blob server will upload jars to hdfs using same name of local file. Even the two jobs share same jar in blob store, they will upload it twice at same time, which will cause file lease occuptation in hdfs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references
[ https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930960#comment-15930960 ] ASF GitHub Bot commented on FLINK-6097: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3560 HI, @KurtYoung Thanks for your attention to this PR. Good question, Here I glad share why I notice this method: When we try to implement OVER window TableAPI, The first version of the prototype to achieve,we do not consider the table field will be out of order when we implement translateToPlan method,then we set outputRow field from inputRow according to the Initial order of the table field index. At the beginning, the projections in the select statement less than 5 columns It works well.But Unfortunately when the count of projections bigger than 4 (>=5), we got the random result. Then we debug the code, we find that ProjectionTranslator # identifyFieldReferences method uses theSet temporary save field, when the number of elements in the Set is less than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and which will cause the data to be out of order. So we thought 2 approach to solve this problem: Let ProjectionTranslator # identifyFieldReferences method guaranteed the order of the extracted field references same as input order. We add the input and output field mapping. At last we using approach#2 solve the problem. This change is not necessary for the problem i have faced. But I feel it is better to let the output of this method in the same order as the input, it may be very helpful for other cases, though I am currently not aware of any. I am ok with not making this change, but we should add a comment instead to highlight that the potential output of the current output. Otherwise, some people may not pay attention to this and assume it is in order. Thanks, SunJincheng > Guaranteed the order of the extracted field references > -- > > Key: FLINK-6097 > URL: https://issues.apache.org/jira/browse/FLINK-6097 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > When we try to implement `OVER window` TableAPI, The first version of the > prototype to achieve,we do not consider the table field will be out of order > when we implement `translateToPlan` method,then we set `outputRow` field > from `inputRow` according to the Initial order of the table field index. > At the beginning, the projections in the select statement less than 5 columns > It works well.But Unfortunately when the count of projections bigger than 4 > (>=5), we got the random result. Then we debug the code, we find that > `ProjectionTranslator # identifyFieldReferences` method uses the` Set` > temporary save field, when the number of elements in the Set is less than 5, > the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of > elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet > and which will cause the data to be out of order. > e.g.: > Add the following elements in turn: > {code} > A, b, c, d, e > Set (a) > Class scala.collection.immutable.Set $ Set1 > Set (a, b) > Class scala.collection.immutable.Set $ Set2 > Set (a, b, c) > Class scala.collection.immutable.Set $ Set3 > Set (a, b, c, d) > Class scala.collection.immutable.Set $ Set4 > // we want (a, b, c, d, e) > Set (e, a, b, c, d) > Class scala.collection.immutable.HashSet $ HashTrieSet > {code} > So we thought 2 approach to solve this problem: > 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the > order of the extracted field references same as input order. > 2. We add the input and output field mapping. > At last we using approach#2 solve the problem. This change is not necessary > for the problem i have faced. But I feel it is better to let the output of > this method in the same order as the input, it may be very helpful for other > cases, though I am currently not aware of any. I am ok with not making this > change, but we should add a comment instead to highlight that the potential > output of the current output. Otherwise, some people may not pay attention to > this and assume it is in order. > Hi, guys, What do you think? Welcome any feedback. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3560: [FLINK-6097][table] Guaranteed the order of the extracted...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3560 HI, @KurtYoung Thanks for your attention to this PR. Good question, Here I glad share why I notice this methodï¼ When we try to implement OVER window TableAPI, The first version of the prototype to achieveï¼we do not consider the table field will be out of order when we implement translateToPlan methodï¼then we set outputRow field from inputRow according to the Initial order of the table field index. At the beginning, the projections in the select statement less than 5 columns It works well.But Unfortunately when the count of projections bigger than 4 (>=5), we got the random result. Then we debug the code, we find that ProjectionTranslator # identifyFieldReferences method uses theSet temporary save field, when the number of elements in the Set is less than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and which will cause the data to be out of order. So we thought 2 approach to solve this problemï¼ Let ProjectionTranslator # identifyFieldReferences method guaranteed the order of the extracted field references same as input order. We add the input and output field mapping. At last we using approach#2 solve the problem. This change is not necessary for the problem i have faced. But I feel it is better to let the output of this method in the same order as the input, it may be very helpful for other cases, though I am currently not aware of any. I am ok with not making this change, but we should add a comment instead to highlight that the potential output of the current output. Otherwise, some people may not pay attention to this and assume it is in order. Thanks, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6097) Guaranteed the order of the extracted field references
[ https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6097: --- Description: When we try to implement `OVER window` TableAPI, The first version of the prototype to achieve,we do not consider the table field will be out of order when we implement `translateToPlan` method,then we set `outputRow` field from `inputRow` according to the Initial order of the table field index. At the beginning, the projections in the select statement less than 5 columns It works well.But Unfortunately when the count of projections bigger than 4 (>=5), we got the random result. Then we debug the code, we find that `ProjectionTranslator # identifyFieldReferences` method uses the` Set` temporary save field, when the number of elements in the Set is less than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and which will cause the data to be out of order. e.g.: Add the following elements in turn: {code} A, b, c, d, e Set (a) Class scala.collection.immutable.Set $ Set1 Set (a, b) Class scala.collection.immutable.Set $ Set2 Set (a, b, c) Class scala.collection.immutable.Set $ Set3 Set (a, b, c, d) Class scala.collection.immutable.Set $ Set4 // we want (a, b, c, d, e) Set (e, a, b, c, d) Class scala.collection.immutable.HashSet $ HashTrieSet {code} So we thought 2 approach to solve this problem: 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the order of the extracted field references same as input order. 2. We add the input and output field mapping. At last we using approach#2 solve the problem. This change is not necessary for the problem i have faced. But I feel it is better to let the output of this method in the same order as the input, it may be very helpful for other cases, though I am currently not aware of any. I am ok with not making this change, but we should add a comment instead to highlight that the potential output of the current output. Otherwise, some people may not pay attention to this and assume it is in order. Hi, guys, What do you think? Welcome any feedback. was: When we try to implement `OVER window` TableAPI, The first version of the prototype to achieve,we do not consider the table field will be out of order when we implement `translateToPlan` method,then we set `outputRow` field from `inputRow` according to the Initial order of the table field index. At the beginning, the projections in the select statement less than 5 columns It works well.But Unfortunately when the count of projections bigger than 4 (>=5), we got the random result. Then we debug the code, we find that `ProjectionTranslator # identifyFieldReferences` method uses the` Set` temporary save field, when the number of elements in the Set is less than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and which will cause the data to be out of order. e.g.: Add the following elements in turn: {code} A, b, c, d, e Set (a) Class scala.collection.immutable.Set $ Set1 Set (a, b) Class scala.collection.immutable.Set $ Set2 Set (a, b, c) Class scala.collection.immutable.Set $ Set3 Set (a, b, c, d) Class scala.collection.immutable.Set $ Set4 // we want (a, b, c, d, e) Set (e, a, b, c, d) Class scala.collection.immutable.HashSet $ HashTrieSet {code} So we thought 2 approach to solve this problem: 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the order of the extracted field references same as input order. 2. We add the input and output field mapping. At last we using approach#2 solve the problem. This change is not necessary for the problem i have faced. But I feel it is better to let the output of this method in the same order as the input, it may be very helpful for other cases, though I am currently not aware of any. I am ok with not making this change, but we should add a comment instead to highlight that the potential output of the current output. Otherwise, some people may not pay attention to this and assume it is in order, like me. Hi, guys, What do you think? Welcome any feedback. > Guaranteed the order of the extracted field references > -- > > Key: FLINK-6097 > URL: https://issues.apache.org/jira/browse/FLINK-6097 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > When we try to implement `OVER window` TableAPI, The first version of the > prototype to achieve,we do not consider the table field will be out of order > when we implement `translateToPlan` method,then we set `outputRow` field >
[jira] [Updated] (FLINK-6097) Guaranteed the order of the extracted field references
[ https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6097: --- Description: When we try to implement `OVER window` TableAPI, The first version of the prototype to achieve,we do not consider the table field will be out of order when we implement `translateToPlan` method,then we set `outputRow` field from `inputRow` according to the Initial order of the table field index. At the beginning, the projections in the select statement less than 5 columns It works well.But Unfortunately when the count of projections bigger than 4 (>=5), we got the random result. Then we debug the code, we find that `ProjectionTranslator # identifyFieldReferences` method uses the` Set` temporary save field, when the number of elements in the Set is less than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and which will cause the data to be out of order. e.g.: Add the following elements in turn: {code} A, b, c, d, e Set (a) Class scala.collection.immutable.Set $ Set1 Set (a, b) Class scala.collection.immutable.Set $ Set2 Set (a, b, c) Class scala.collection.immutable.Set $ Set3 Set (a, b, c, d) Class scala.collection.immutable.Set $ Set4 // we want (a, b, c, d, e) Set (e, a, b, c, d) Class scala.collection.immutable.HashSet $ HashTrieSet {code} So we thought 2 approach to solve this problem: 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the order of the extracted field references same as input order. 2. We add the input and output field mapping. At last we using approach#2 solve the problem. This change is not necessary for the problem i have faced. But I feel it is better to let the output of this method in the same order as the input, it may be very helpful for other cases, though I am currently not aware of any. I am ok with not making this change, but we should add a comment instead to highlight that the potential output of the current output. Otherwise, some people may not pay attention to this and assume it is in order, like me. Hi, guys, What do you think? Welcome any feedback. was: The current `ProjectionTranslator # identifyFieldReferences` method uses the` Set` temporary save field, when the number of elements in the Set is less than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and which will cause the data to be out of order. although the out of order is also working, but I think the order is better than out of order. So I want to improve it,Orderly extraction field.i.e.Guaranteed the order of the extracted field references as input order. e.g.: Add the following elements in turn: {code} A, b, c, d, e Set (a) Class scala.collection.immutable.Set $ Set1 Set (a, b) Class scala.collection.immutable.Set $ Set2 Set (a, b, c) Class scala.collection.immutable.Set $ Set3 Set (a, b, c, d) Class scala.collection.immutable.Set $ Set4 Set (e, a, b, c, d) -> I want (a, b, c, d, e) Class scala.collection.immutable.HashSet $ HashTrieSet {code} > Guaranteed the order of the extracted field references > -- > > Key: FLINK-6097 > URL: https://issues.apache.org/jira/browse/FLINK-6097 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > When we try to implement `OVER window` TableAPI, The first version of the > prototype to achieve,we do not consider the table field will be out of order > when we implement `translateToPlan` method,then we set `outputRow` field > from `inputRow` according to the Initial order of the table field index. > At the beginning, the projections in the select statement less than 5 columns > It works well.But Unfortunately when the count of projections bigger than 4 > (>=5), we got the random result. Then we debug the code, we find that > `ProjectionTranslator # identifyFieldReferences` method uses the` Set` > temporary save field, when the number of elements in the Set is less than 5, > the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of > elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet > and which will cause the data to be out of order. > e.g.: > Add the following elements in turn: > {code} > A, b, c, d, e > Set (a) > Class scala.collection.immutable.Set $ Set1 > Set (a, b) > Class scala.collection.immutable.Set $ Set2 > Set (a, b, c) > Class scala.collection.immutable.Set $ Set3 > Set (a, b, c, d) > Class scala.collection.immutable.Set $ Set4 > // we want (a, b, c, d, e) > Set (e, a, b, c, d) > Class scala.collection.immutable.HashSet $
[GitHub] flink issue #3560: [FLINK-6097][table] Guaranteed the order of the extracted...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3560 HI, @KurtYoung Thanks for your attention to this PR. Good question, the purpose of this change is just like this JIRA's description: I want `ProjectionTranslator # identifyFieldReferences` to guaranteed the order of the extracted field references same as input order. using `Set` just want to eliminate duplicate field references, and `List` also can do this, it is no harm to keep the `ProjectionTranslator # identifyFieldReferences` method's output order consistent with input. If the disorder results of `ProjectionTranslator # identifyFieldReferences` can work, then the orderly results must also work very well, because the order is also a situation of disorder. What to you think? Thanks, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references
[ https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930890#comment-15930890 ] ASF GitHub Bot commented on FLINK-6097: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3560 HI, @KurtYoung Thanks for your attention to this PR. Good question, the purpose of this change is just like this JIRA's description: I want `ProjectionTranslator # identifyFieldReferences` to guaranteed the order of the extracted field references same as input order. using `Set` just want to eliminate duplicate field references, and `List` also can do this, it is no harm to keep the `ProjectionTranslator # identifyFieldReferences` method's output order consistent with input. If the disorder results of `ProjectionTranslator # identifyFieldReferences` can work, then the orderly results must also work very well, because the order is also a situation of disorder. What to you think? Thanks, SunJincheng > Guaranteed the order of the extracted field references > -- > > Key: FLINK-6097 > URL: https://issues.apache.org/jira/browse/FLINK-6097 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > The current `ProjectionTranslator # identifyFieldReferences` method uses the` > Set` temporary save field, when the number of elements in the Set is less > than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number > of elements is greater than or equal to 5, the Set takes HashSet # > HashTrieSet and which will cause the data to be out of order. although the > out of order is also working, but I think the order is better than out of > order. So I want to improve it,Orderly extraction field.i.e.Guaranteed the > order of the extracted field references as input order. > e.g.: > Add the following elements in turn: > {code} > A, b, c, d, e > Set (a) > Class scala.collection.immutable.Set $ Set1 > Set (a, b) > Class scala.collection.immutable.Set $ Set2 > Set (a, b, c) > Class scala.collection.immutable.Set $ Set3 > Set (a, b, c, d) > Class scala.collection.immutable.Set $ Set4 > Set (e, a, b, c, d) -> I want (a, b, c, d, e) > Class scala.collection.immutable.HashSet $ HashTrieSet > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references
[ https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930874#comment-15930874 ] ASF GitHub Bot commented on FLINK-6097: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3560#discussion_r106761581 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -227,18 +227,23 @@ object ProjectionTranslator { * @param exprs a list of expressions to extract * @return a list of field references extracted from the given expressions */ - def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] = { -exprs.foldLeft(Set[NamedExpression]()) { + def extractFieldReferences(exprs: Seq[Expression]): List[NamedExpression] = { +exprs.foldLeft(List[NamedExpression]()) { (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences) -}.toSeq +} } private def identifyFieldReferences( expr: Expression, - fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr match { + fieldReferences: List[NamedExpression]): List[NamedExpression] = expr match { --- End diff -- Can you explain more about ` Especially the order is not defined very well, it will change easily if we modify the code.`? > Guaranteed the order of the extracted field references > -- > > Key: FLINK-6097 > URL: https://issues.apache.org/jira/browse/FLINK-6097 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > The current `ProjectionTranslator # identifyFieldReferences` method uses the` > Set` temporary save field, when the number of elements in the Set is less > than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number > of elements is greater than or equal to 5, the Set takes HashSet # > HashTrieSet and which will cause the data to be out of order. although the > out of order is also working, but I think the order is better than out of > order. So I want to improve it,Orderly extraction field.i.e.Guaranteed the > order of the extracted field references as input order. > e.g.: > Add the following elements in turn: > {code} > A, b, c, d, e > Set (a) > Class scala.collection.immutable.Set $ Set1 > Set (a, b) > Class scala.collection.immutable.Set $ Set2 > Set (a, b, c) > Class scala.collection.immutable.Set $ Set3 > Set (a, b, c, d) > Class scala.collection.immutable.Set $ Set4 > Set (e, a, b, c, d) -> I want (a, b, c, d, e) > Class scala.collection.immutable.HashSet $ HashTrieSet > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3560: [FLINK-6097][table] Guaranteed the order of the ex...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3560#discussion_r106761581 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -227,18 +227,23 @@ object ProjectionTranslator { * @param exprs a list of expressions to extract * @return a list of field references extracted from the given expressions */ - def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] = { -exprs.foldLeft(Set[NamedExpression]()) { + def extractFieldReferences(exprs: Seq[Expression]): List[NamedExpression] = { +exprs.foldLeft(List[NamedExpression]()) { (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences) -}.toSeq +} } private def identifyFieldReferences( expr: Expression, - fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr match { + fieldReferences: List[NamedExpression]): List[NamedExpression] = expr match { --- End diff -- Can you explain more about ` Especially the order is not defined very well, it will change easily if we modify the code.`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism
[ https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930812#comment-15930812 ] ASF GitHub Bot commented on FLINK-5808: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3509 > Missing verification for setParallelism and setMaxParallelism > - > > Key: FLINK-5808 > URL: https://issues.apache.org/jira/browse/FLINK-5808 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > When {{setParallelism()}} is called we don't verify that it is <= than max > parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check > that the new value doesn't clash with a previously set parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3509: [FLINK-5808] Fix Missing verification for setParal...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3509 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly
[ https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930495#comment-15930495 ] ASF GitHub Bot commented on FLINK-6020: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 I think we should then fix this in the blob server. The problem that only one should succeed upon collision should be fixable by using `Files.move()` with `ATOMIC_MOVE`. Only when that succeeds, we store the file in the blob store. What do you think? > Blob Server cannot hanlde multiple job sumits(with same content) parallelly > --- > > Key: FLINK-6020 > URL: https://issues.apache.org/jira/browse/FLINK-6020 > Project: Flink > Issue Type: Bug >Reporter: Tao Wang >Assignee: Tao Wang >Priority: Critical > > In yarn-cluster mode, if we submit one same job multiple times parallelly, > the task will encounter class load problem and lease occuputation. > Because blob server stores user jars in name with generated sha1sum of those, > first writes a temp file and move it to finalialize. For recovery it also > will put them to HDFS with same file name. > In same time, when multiple clients sumit same job with same jar, the local > jar files in blob server and those file on hdfs will be handled in multiple > threads(BlobServerConnection), and impact each other. > It's better to have a way to handle this, now two ideas comes up to my head: > 1. lock the write operation, or > 2. use some unique identifier as file name instead of ( or added up to) > sha1sum of the file contents. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3525: [FLINK-6020]add a random integer suffix to blob key to av...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 I think we should then fix this in the blob server. The problem that only one should succeed upon collision should be fixable by using `Files.move()` with `ATOMIC_MOVE`. Only when that succeeds, we store the file in the blob store. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r106717641 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -17,29 +17,51 @@ */ package org.apache.flink.table.api -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.api.java.typeutils.{Types => JTypes} /** * This class enumerates all supported types of the Table API. */ -object Types { +object Types extends JTypes { - val STRING = BasicTypeInfo.STRING_TYPE_INFO - val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO + val STRING = JTypes.STRING + val BOOLEAN = JTypes.BOOLEAN - val BYTE = BasicTypeInfo.BYTE_TYPE_INFO - val SHORT = BasicTypeInfo.SHORT_TYPE_INFO - val INT = BasicTypeInfo.INT_TYPE_INFO - val LONG = BasicTypeInfo.LONG_TYPE_INFO - val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO - val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO - val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO + val BYTE = JTypes.BYTE + val SHORT = JTypes.SHORT + val INT = JTypes.INT + val LONG = JTypes.LONG + val FLOAT = JTypes.FLOAT + val DOUBLE = JTypes.DOUBLE + val DECIMAL = JTypes.DECIMAL - val DATE = SqlTimeTypeInfo.DATE - val TIME = SqlTimeTypeInfo.TIME - val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP + val SQL_DATE = JTypes.SQL_DATE + val SQL_TIME = JTypes.SQL_TIME + val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Generates RowTypeInfo with default names (f1, f2 ..). +* same as ``new RowTypeInfo(types)`` +* +* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT) +*/ + def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = { --- End diff -- How about calling it `ROW` and `ROW_NAMED` or so? I think just adding another parameter is hacky... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5481) Simplify Row creation
[ https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930439#comment-15930439 ] ASF GitHub Bot commented on FLINK-5481: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r106717641 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -17,29 +17,51 @@ */ package org.apache.flink.table.api -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.api.java.typeutils.{Types => JTypes} /** * This class enumerates all supported types of the Table API. */ -object Types { +object Types extends JTypes { - val STRING = BasicTypeInfo.STRING_TYPE_INFO - val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO + val STRING = JTypes.STRING + val BOOLEAN = JTypes.BOOLEAN - val BYTE = BasicTypeInfo.BYTE_TYPE_INFO - val SHORT = BasicTypeInfo.SHORT_TYPE_INFO - val INT = BasicTypeInfo.INT_TYPE_INFO - val LONG = BasicTypeInfo.LONG_TYPE_INFO - val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO - val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO - val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO + val BYTE = JTypes.BYTE + val SHORT = JTypes.SHORT + val INT = JTypes.INT + val LONG = JTypes.LONG + val FLOAT = JTypes.FLOAT + val DOUBLE = JTypes.DOUBLE + val DECIMAL = JTypes.DECIMAL - val DATE = SqlTimeTypeInfo.DATE - val TIME = SqlTimeTypeInfo.TIME - val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP + val SQL_DATE = JTypes.SQL_DATE + val SQL_TIME = JTypes.SQL_TIME + val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Generates RowTypeInfo with default names (f1, f2 ..). +* same as ``new RowTypeInfo(types)`` +* +* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT) +*/ + def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = { --- End diff -- How about calling it `ROW` and `ROW_NAMED` or so? I think just adding another parameter is hacky... > Simplify Row creation > - > > Key: FLINK-5481 > URL: https://issues.apache.org/jira/browse/FLINK-5481 > Project: Flink > Issue Type: Bug > Components: DataSet API, Table API & SQL >Affects Versions: 1.2.0 >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Trivial > > When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first > element of {{data}} to define a type. If first Row in collection has wrong > number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but > GenericType -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3524 One more question: Can the StateRegistry not directly drop states that have no reference any more when states are unregistered? Is there a special reason for first collecting these states in a list, then getting them and then dropping them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6014) Allow the registration of state objects in checkpoints
[ https://issues.apache.org/jira/browse/FLINK-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930426#comment-15930426 ] ASF GitHub Bot commented on FLINK-6014: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3524 One more question: Can the StateRegistry not directly drop states that have no reference any more when states are unregistered? Is there a special reason for first collecting these states in a list, then getting them and then dropping them? > Allow the registration of state objects in checkpoints > -- > > Key: FLINK-6014 > URL: https://issues.apache.org/jira/browse/FLINK-6014 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > This issue is the very first step towards incremental checkpointing. We > introduce a new state handle named {{CompositeStateHandle}} to be the base of > the snapshots taken by task components. Known implementation may include > {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for > subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s). > Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. > It should register all its state objects in {{StateRegistry}} when its > checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending > checkpoint completes or a complete checkpoint is reloaded in the recovery). > When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, > we should not simply discard all state objects in the checkpoint. With the > introduction of incremental checkpointing, a {{StateObject}} may be > referenced by different checkpoints. We should unregister all the state > objects contained in the {{StateRegistry}} first. Only those state objects > that are not referenced by any checkpoint can be deleted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6014) Allow the registration of state objects in checkpoints
[ https://issues.apache.org/jira/browse/FLINK-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930423#comment-15930423 ] ASF GitHub Bot commented on FLINK-6014: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3524 Thanks for opening this pull request. Adding a `CompositeStateHandle` and a `StateRegistry` is a good idea. Some thoughts: - What do you think about making the `StateRegistry` into a `SharedStateRegistry` which only contains the handles to state that is shared across checkpoints? State that is exclusive to a checkpoint is not handled by that registry, but remains only in the checkpoint. That way we "isolate" the existing behavior against the coming changes and do not risk regressions in the state cleanup code (which is very critical for current users). - Another reason for the above suggestion is to also bring some other code into place that has some "fast paths" and "safety nets" for checkpoint cleanups (currently only with non-shared state), for example dropping a checkpoint simply by a `rm -r` (see https://github.com/apache/flink/pull/3522 ). We have seen that for various users the state cleanup problems are among the biggest problems they have, which we can address very well with the work started in the above linked pull request. These things would work together seamlessly if the registry deals only with shared state handles. - I am wondering if it is easier to put the registry into the checkpoint coordinator rather than the checkpoint stores. That way we need the code that deals with adding / failure handling / etc only once. > Allow the registration of state objects in checkpoints > -- > > Key: FLINK-6014 > URL: https://issues.apache.org/jira/browse/FLINK-6014 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > This issue is the very first step towards incremental checkpointing. We > introduce a new state handle named {{CompositeStateHandle}} to be the base of > the snapshots taken by task components. Known implementation may include > {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for > subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s). > Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. > It should register all its state objects in {{StateRegistry}} when its > checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending > checkpoint completes or a complete checkpoint is reloaded in the recovery). > When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, > we should not simply discard all state objects in the checkpoint. With the > introduction of incremental checkpointing, a {{StateObject}} may be > referenced by different checkpoints. We should unregister all the state > objects contained in the {{StateRegistry}} first. Only those state objects > that are not referenced by any checkpoint can be deleted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3524 Thanks for opening this pull request. Adding a `CompositeStateHandle` and a `StateRegistry` is a good idea. Some thoughts: - What do you think about making the `StateRegistry` into a `SharedStateRegistry` which only contains the handles to state that is shared across checkpoints? State that is exclusive to a checkpoint is not handled by that registry, but remains only in the checkpoint. That way we "isolate" the existing behavior against the coming changes and do not risk regressions in the state cleanup code (which is very critical for current users). - Another reason for the above suggestion is to also bring some other code into place that has some "fast paths" and "safety nets" for checkpoint cleanups (currently only with non-shared state), for example dropping a checkpoint simply by a `rm -r` (see https://github.com/apache/flink/pull/3522 ). We have seen that for various users the state cleanup problems are among the biggest problems they have, which we can address very well with the work started in the above linked pull request. These things would work together seamlessly if the registry deals only with shared state handles. - I am wondering if it is easier to put the registry into the checkpoint coordinator rather than the checkpoint stores. That way we need the code that deals with adding / failure handling / etc only once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2029: [FLINK-2814] Fix for DualInputPlanNode cannot be cast to ...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2029 I created #3563 which combines this PR and my suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3550: [FLINK-5654] - Add processing time OVER RANGE BETWEEN x P...
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3550 @fhueske - Thank you very much for your review. It was very useful. I integrated and addressed most of the remarks you made. What remains to be discussed and it is not addressed is: -using the processingFunction vs keeping the Window-based implementation (i prefer and believe is more appropriate here later as mentioned) -doing the removal of the merge commit...as per my previous comment i do not know exactly what to do about that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6050) Improve failure reporting when using Future.thenAccept
[ https://issues.apache.org/jira/browse/FLINK-6050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930381#comment-15930381 ] ASF GitHub Bot commented on FLINK-6050: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3537 I think this is good, +1 Do we have a test that validates that completing a `Future` exceptionally also completes all result Futures of `thenApply` (or `thenApplyAsync`) functions with an exception? > Improve failure reporting when using Future.thenAccept > -- > > Key: FLINK-6050 > URL: https://issues.apache.org/jira/browse/FLINK-6050 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > When applying {{Future.thenAccept(Async)}} onto a {{Future}}, then we should > register the exception handler on the returned {{Future}} and not on > the original future. This has the advantage that we also catch exceptions > which are thrown in the {{AcceptFunction}} and not only those originating > from the original {{Future}}. This improve Flink's behaviour, because > exceptions are not swallowed in the returned {{Future}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3537: [FLINK-6050] [robustness] Register exception handler on t...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3537 I think this is good, +1 Do we have a test that validates that completing a `Future` exceptionally also completes all result Futures of `thenApply` (or `thenApplyAsync`) functions with an exception? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2814) DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode
[ https://issues.apache.org/jira/browse/FLINK-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930377#comment-15930377 ] ASF GitHub Bot commented on FLINK-2814: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2029 I created #3563 which combines this PR and my suggestion. > DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode > --- > > Key: FLINK-2814 > URL: https://issues.apache.org/jira/browse/FLINK-2814 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.10.0 >Reporter: Greg Hogan >Assignee: Rekha Joshi > > A delta iteration that closes with a solution set which is a {{JoinOperator}} > throws the following exception: > {noformat} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:444) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:345) > at org.apache.flink.client.program.Client.runBlocking(Client.java:289) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:324) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:969) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1019) > Caused by: java.lang.ClassCastException: > org.apache.flink.optimizer.plan.DualInputPlanNode cannot be cast to > org.apache.flink.optimizer.plan.SingleInputPlanNode > at > org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:432) > at > org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557) > at > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478) > at > org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204) > at > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) > at > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:271) > at > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:543) > at org.apache.flink.client.program.Client.runBlocking(Client.java:350) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:64) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:796) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:424) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1365) > at Driver.main(Driver.java:366) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:429) > ... 6 more > {noformat} > Temporary fix is to attach an identity mapper. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3521: [FLINK-6027][checkpoint] Ignore the exception thrown by t...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3521 Actually, do you think you could add a test for this? Would be good to guard that for the future... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930372#comment-15930372 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3550 @fhueske - Thank you very much for your review. It was very useful. I integrated and addressed most of the remarks you made. What remains to be discussed and it is not addressed is: -using the processingFunction vs keeping the Window-based implementation (i prefer and believe is more appropriate here later as mentioned) -doing the removal of the merge commit...as per my previous comment i do not know exactly what to do about that > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2814) DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode
[ https://issues.apache.org/jira/browse/FLINK-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930371#comment-15930371 ] ASF GitHub Bot commented on FLINK-2814: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3563 [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to SingleInputPlanNode WorksetIterationNode#instantiate loops over all solution and work set candidates. Since the solution set reference is modified in place when the predecessor node can be used in its place, swith this variable to the inner loop. @StephanEwen this is similar to #2029 but resets the reference in the loop. I believe my prior suggestion to immediately return upon adding a node was incorrect as the `instantiate` methods look to be compiling all valid combinations. IntelliJ code coverage on `flink-optimizer` shows 105 hits through `WorksetIterationNode#instantiate` and it does fix this issue with my Katz Centrality algorithm (which should not be using delta iterations, but I was young and naive when I wrote it). You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 2814_deltaiteration_dualinputplannode_cannot_be_cast_to_singleinputplannode Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3563.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3563 commit 34f017834e17fa69e2b7c72bd95e1a819e4e6aa3 Author: Greg HoganDate: 2017-03-17T16:09:34Z [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to SingleInputPlanNode WorksetIterationNode#instantiate loops over all solution and work set candidates. Since the solution set reference is modified in place when the predecessor node can be used in its place, swith this variable to the inner loop. > DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode > --- > > Key: FLINK-2814 > URL: https://issues.apache.org/jira/browse/FLINK-2814 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.10.0 >Reporter: Greg Hogan >Assignee: Rekha Joshi > > A delta iteration that closes with a solution set which is a {{JoinOperator}} > throws the following exception: > {noformat} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:444) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:345) > at org.apache.flink.client.program.Client.runBlocking(Client.java:289) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:324) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:969) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1019) > Caused by: java.lang.ClassCastException: > org.apache.flink.optimizer.plan.DualInputPlanNode cannot be cast to > org.apache.flink.optimizer.plan.SingleInputPlanNode > at > org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:432) > at > org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557) > at > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478) > at > org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204) > at > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) > at > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:271) > at > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:543) > at org.apache.flink.client.program.Client.runBlocking(Client.java:350) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:64) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:796) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:424) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1365) > at Driver.main(Driver.java:366) > at
[GitHub] flink pull request #3563: [FLINK-2814] [optimizer] DualInputPlanNode cannot ...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3563 [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to SingleInputPlanNode WorksetIterationNode#instantiate loops over all solution and work set candidates. Since the solution set reference is modified in place when the predecessor node can be used in its place, swith this variable to the inner loop. @StephanEwen this is similar to #2029 but resets the reference in the loop. I believe my prior suggestion to immediately return upon adding a node was incorrect as the `instantiate` methods look to be compiling all valid combinations. IntelliJ code coverage on `flink-optimizer` shows 105 hits through `WorksetIterationNode#instantiate` and it does fix this issue with my Katz Centrality algorithm (which should not be using delta iterations, but I was young and naive when I wrote it). You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 2814_deltaiteration_dualinputplannode_cannot_be_cast_to_singleinputplannode Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3563.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3563 commit 34f017834e17fa69e2b7c72bd95e1a819e4e6aa3 Author: Greg HoganDate: 2017-03-17T16:09:34Z [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to SingleInputPlanNode WorksetIterationNode#instantiate loops over all solution and work set candidates. Since the solution set reference is modified in place when the predecessor node can be used in its place, swith this variable to the inner loop. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3526: [FLINK-5999] [resMgnr] Move JobLeaderIdService shut down ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3526 Looks good to me, +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5999) MiniClusterITCase.runJobWithMultipleRpcServices fails
[ https://issues.apache.org/jira/browse/FLINK-5999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930363#comment-15930363 ] ASF GitHub Bot commented on FLINK-5999: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3526 Looks good to me, +1 to merge > MiniClusterITCase.runJobWithMultipleRpcServices fails > - > > Key: FLINK-5999 > URL: https://issues.apache.org/jira/browse/FLINK-5999 > Project: Flink > Issue Type: Test > Components: Distributed Coordination, Tests >Reporter: Ufuk Celebi >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > > In a branch with unrelated changes to the web frontend I've seen the > following test fail: > {code} > runJobWithMultipleRpcServices(org.apache.flink.runtime.minicluster.MiniClusterITCase) > Time elapsed: 1.145 sec <<< ERROR! > java.util.ConcurrentModificationException: null > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) > at java.util.HashMap$ValueIterator.next(HashMap.java:1458) > at > org.apache.flink.runtime.resourcemanager.JobLeaderIdService.clear(JobLeaderIdService.java:114) > at > org.apache.flink.runtime.resourcemanager.JobLeaderIdService.stop(JobLeaderIdService.java:92) > at > org.apache.flink.runtime.resourcemanager.ResourceManager.shutDown(ResourceManager.java:182) > at > org.apache.flink.runtime.resourcemanager.ResourceManagerRunner.shutDownInternally(ResourceManagerRunner.java:83) > at > org.apache.flink.runtime.resourcemanager.ResourceManagerRunner.shutDown(ResourceManagerRunner.java:78) > at > org.apache.flink.runtime.minicluster.MiniCluster.shutdownInternally(MiniCluster.java:313) > at > org.apache.flink.runtime.minicluster.MiniCluster.shutdown(MiniCluster.java:281) > at > org.apache.flink.runtime.minicluster.MiniClusterITCase.runJobWithMultipleRpcServices(MiniClusterITCase.java:72) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6027) Ignore the exception thrown by the subsuming of old completed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-6027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930360#comment-15930360 ] ASF GitHub Bot commented on FLINK-6027: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3521 Actually, do you think you could add a test for this? Would be good to guard that for the future... > Ignore the exception thrown by the subsuming of old completed checkpoints > - > > Key: FLINK-6027 > URL: https://issues.apache.org/jira/browse/FLINK-6027 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > When a checkpoint is added into the {{CompletedCheckpointStore}} via the > method {{addCheckpoint()}}, the oldest checkpoints will be removed from the > store if the number of stored checkpoints exceeds the given limit. The > subsuming of old checkpoints may fail and make {{addCheckpoint()}} throw > exceptions which are caught by {{CheckpointCoordinator}}. Finally, the states > in the new checkpoint will be deleted by {{CheckpointCoordinator}}. Because > the new checkpoint is still in the store, we may recover the job from the new > checkpoint. But the recovery will fail as the states of the checkpoint are > all deleted. > We should ignore the exceptions thrown by the subsuming of old checkpoints > because we can always recover from the new checkpoint when successfully > adding it into the store. The ignorance may produce some dirty data, but it's > acceptable because they can be cleaned with the cleanup hook introduced in > the near future. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5376) Misleading log statements in UnorderedStreamElementQueue
[ https://issues.apache.org/jira/browse/FLINK-5376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5376: -- Description: The following are two examples where ordered stream element queue is mentioned: {code} LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + {code} I guess OrderedStreamElementQueue was coded first. was: The following are two examples where ordered stream element queue is mentioned: {code} LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + {code} I guess OrderedStreamElementQueue was coded first. > Misleading log statements in UnorderedStreamElementQueue > > > Key: FLINK-5376 > URL: https://issues.apache.org/jira/browse/FLINK-5376 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Priority: Minor > > The following are two examples where ordered stream element queue is > mentioned: > {code} > LOG.debug("Put element into ordered stream element queue. New filling > degree " + > "({}/{}).", numberEntries, capacity); > return true; > } else { > LOG.debug("Failed to put element into ordered stream element queue > because it " + > {code} > I guess OrderedStreamElementQueue was coded first. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822346#comment-15822346 ] Ted Yu edited comment on FLINK-5486 at 3/17/17 5:19 PM: Lock on State.bucketStates should be held in the following method: {code} private void handleRestoredBucketState(State restoredState) { Preconditions.checkNotNull(restoredState); for (BucketState bucketState : restoredState.bucketStates.values()) { {code} was (Author: yuzhih...@gmail.com): Lock on State.bucketStates should be held in the following method: {code} private void handleRestoredBucketState(State restoredState) { Preconditions.checkNotNull(restoredState); for (BucketState bucketState : restoredState.bucketStates.values()) { {code} > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-5985: - Assignee: Stefan Richter > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Assignee: Stefan Richter >Priority: Critical > Fix For: 1.3.0, 1.2.1 > > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-5985. - Resolution: Fixed Fix Version/s: 1.2.1 1.3.0 fixed in 20fff32. > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Assignee: Stefan Richter >Priority: Critical > Fix For: 1.3.0, 1.2.1 > > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930324#comment-15930324 ] ASF GitHub Bot commented on FLINK-5985: --- Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3523 > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > Fix For: 1.3.0, 1.2.1 > > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3523: [FLINK-5985] Report no task states for stateless t...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3523 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930320#comment-15930320 ] ASF GitHub Bot commented on FLINK-5985: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3523 Thanks for the review @StephanEwen. I updated the test as suggested. Merging this now. > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks on...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3523 Thanks for the review @StephanEwen. I updated the test as suggested. Merging this now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly
[ https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930311#comment-15930311 ] ASF GitHub Bot commented on FLINK-6020: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 The second rename will not fail, but make the file which written by the first corrupted, which will make the first job failed if the task is loading this jar. by the way, the jar file will be uploaded to hdfs for recovery, and the uploading will fail too if there are more than two clients writing file with same name. It is easy to reoccur. First launch a session with enough slots, then run a script contains many same job submitting, says there are 20 lines of "flink run ../examples/steaming/WindowJoin.jar &". Make sure there's a "&" in end of each line to make them run in parallel. > Blob Server cannot hanlde multiple job sumits(with same content) parallelly > --- > > Key: FLINK-6020 > URL: https://issues.apache.org/jira/browse/FLINK-6020 > Project: Flink > Issue Type: Bug >Reporter: Tao Wang >Assignee: Tao Wang >Priority: Critical > > In yarn-cluster mode, if we submit one same job multiple times parallelly, > the task will encounter class load problem and lease occuputation. > Because blob server stores user jars in name with generated sha1sum of those, > first writes a temp file and move it to finalialize. For recovery it also > will put them to HDFS with same file name. > In same time, when multiple clients sumit same job with same jar, the local > jar files in blob server and those file on hdfs will be handled in multiple > threads(BlobServerConnection), and impact each other. > It's better to have a way to handle this, now two ideas comes up to my head: > 1. lock the write operation, or > 2. use some unique identifier as file name instead of ( or added up to) > sha1sum of the file contents. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3525: [FLINK-6020]add a random integer suffix to blob key to av...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 The second rename will not fail, but make the file which written by the first corrupted, which will make the first job failed if the task is loading this jar. by the way, the jar file will be uploaded to hdfs for recovery, and the uploading will fail too if there are more than two clients writing file with same name. It is easy to reoccur. First launch a session with enough slots, then run a script contains many same job submitting, says there are 20 lines of "flink run ../examples/steaming/WindowJoin.jar &". Make sure there's a "&" in end of each line to make them run in parallel. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()
[ https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5541: -- Description: {code} if (localJar == null) { try { for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment()) .getJars()) { // TODO verify that there is only one jar localJar = new File(url.toURI()).getAbsolutePath(); } } catch (final URISyntaxException e) { // ignore } catch (final ClassCastException e) { // ignore } } logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf); client.submitTopologyWithOpts(name, localJar, topology); {code} Since the try block may encounter URISyntaxException / ClassCastException, we should check that localJar is not null before calling submitTopologyWithOpts(). was: {code} if (localJar == null) { try { for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment()) .getJars()) { // TODO verify that there is only one jar localJar = new File(url.toURI()).getAbsolutePath(); } } catch (final URISyntaxException e) { // ignore } catch (final ClassCastException e) { // ignore } } logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf); client.submitTopologyWithOpts(name, localJar, topology); {code} Since the try block may encounter URISyntaxException / ClassCastException, we should check that localJar is not null before calling submitTopologyWithOpts(). > Missing null check for localJar in FlinkSubmitter#submitTopology() > -- > > Key: FLINK-5541 > URL: https://issues.apache.org/jira/browse/FLINK-5541 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Ted Yu >Priority: Minor > > {code} > if (localJar == null) { > try { > for (final URL url : ((ContextEnvironment) > ExecutionEnvironment.getExecutionEnvironment()) > .getJars()) { > // TODO verify that there is only one jar > localJar = new File(url.toURI()).getAbsolutePath(); > } > } catch (final URISyntaxException e) { > // ignore > } catch (final ClassCastException e) { > // ignore > } > } > logger.info("Submitting topology " + name + " in distributed mode with > conf " + serConf); > client.submitTopologyWithOpts(name, localJar, topology); > {code} > Since the try block may encounter URISyntaxException / ClassCastException, we > should check that localJar is not null before calling > submitTopologyWithOpts(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5629) Unclosed RandomAccessFile in StaticFileServerHandler#respondAsLeader()
[ https://issues.apache.org/jira/browse/FLINK-5629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854366#comment-15854366 ] Ted Yu edited comment on FLINK-5629 at 3/17/17 5:01 PM: RandomAccessFile#length() may throw IOE. raf is used in the following code path where DefaultFileRegion is not involved: {code} } else { lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), {code} It is good practice to close RandomAccessFile in all code paths. was (Author: yuzhih...@gmail.com): RandomAccessFile#length() may throw IOE. raf is used in the following code path where DefaultFileRegion is not involved: {code} } else { lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), {code} It is good practice to close RandomAccessFile in all code paths. > Unclosed RandomAccessFile in StaticFileServerHandler#respondAsLeader() > -- > > Key: FLINK-5629 > URL: https://issues.apache.org/jira/browse/FLINK-5629 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Ted Yu >Priority: Minor > > {code} > final RandomAccessFile raf; > try { > raf = new RandomAccessFile(file, "r"); > ... > long fileLength = raf.length(); > {code} > The RandomAccessFile should be closed upon return from method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5855: -- Description: {code} handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); {code} Lock on pendingFilesPerCheckpoint should be obtained prior to the call to handlePendingFilesForPreviousCheckpoints(). was: {code} handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); {code} Lock on pendingFilesPerCheckpoint should be obtained prior to the call to handlePendingFilesForPreviousCheckpoints(). > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106685158 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java --- @@ -48,15 +49,25 @@ * output selection). */ private final List selectedNames; + + /** +* The side-output tag (if any) of this {@link StreamEdge}. +*/ + private final OutputTag outputTag; + + /** +* The {@link StreamPartitioner} on this {@link StreamEdge}. +*/ private StreamPartitioner outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, - List selectedNames, StreamPartitioner outputPartitioner) { + List selectedNames, StreamPartitioner outputPartitioner, OutputTag outputTag) { this.sourceVertex = sourceVertex; this.targetVertex = targetVertex; this.typeNumber = typeNumber; this.selectedNames = selectedNames; this.outputPartitioner = outputPartitioner; + this.outputTag = outputTag; --- End diff -- Not sure what the edge id exactly does and who uses it so I prefer to not touch it, for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674262 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** +* Returns the URL for the BLOB with the given parameters. The method will first attempt to +* serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try +* to download it from this cache's BLOB server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +* @return URL referring to the local storage location of the BLOB. +* @throws java.io.FileNotFoundException if the path does not exist; +* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - IOUtils.closeQuietly(os); - IOUtils.closeQuietly(is); -
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930241#comment-15930241 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3484 Thanks for reviewing again, @kl0u! I incorporated all your suggestions. I'm now waiting for travis to give the green light and then I'll merge. @chenqin A lot of thanks also to you for working on this and pushing it with me! > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3484 Thanks for reviewing again, @kl0u! I incorporated all your suggestions. I'm now waiting for travis to give the green light and then I'll merge. @chenqin A lot of thanks also to you for working on this and pushing it with me! ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930235#comment-15930235 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106688499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -191,3 +287,31 @@ class DataStreamOverAggregate( } +object DataStreamProcTimeCase { + class ProcTimeTimestampExtractor --- End diff -- @fhueske Given also your previous comment i would assume this goes down ...which i did > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106688499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -191,3 +287,31 @@ class DataStreamOverAggregate( } +object DataStreamProcTimeCase { + class ProcTimeTimestampExtractor --- End diff -- @fhueske Given also your previous comment i would assume this goes down ...which i did --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930234#comment-15930234 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106688280 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -785,7 +785,7 @@ object AggregateUtil { (propPos._1, propPos._2) } - private def transformToAggregateFunctions( --- End diff -- It is not to much logic...and the overall "createTimeBoundedProcessingTimeOverWindow" is not bigger than the other method for unbound. ...as I said...it is the same...so as you prefer > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106688280 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -785,7 +785,7 @@ object AggregateUtil { (propPos._1, propPos._2) } - private def transformToAggregateFunctions( --- End diff -- It is not to much logic...and the overall "createTimeBoundedProcessingTimeOverWindow" is not bigger than the other method for unbound. ...as I said...it is the same...so as you prefer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106673680 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106677330 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106670661 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -18,43 +18,306 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Interface for working with time and timers. * * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} * that allows to specify a key and a namespace to which timers should be scoped. * + * All d + * + * @param Type of the keys in the stream * @param Type of the namespace to which timers are scoped. */ @Internal -public interface InternalTimerService { +public abstract class InternalTimerServiceimplements ProcessingTimeCallback, EventTimeCallback { + + protected final ProcessingTimeService processingTimeService; + + protected final KeyContext keyContext; + + protected final int totalKeyGroups; + + protected final KeyGroupRange keyGroupRange; + + /** +* The one and only Future (if any) registered to execute the +* next {@link Triggerable} action, when its (processing) time arrives. +*/ + protected ScheduledFuture nextTimer; + + /** +* The local event time, as denoted by the last received +* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}. +*/ + private long currentWatermark = Long.MIN_VALUE; + + // Variables to be set when the service is started. + + protected TypeSerializer keySerializer; + + protected TypeSerializer namespaceSerializer; + + private InternalTimer.TimerSerializer timerSerializer; + + protected Triggerable triggerTarget; + + private volatile boolean isInitialized; + + public InternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + + this.totalKeyGroups = totalKeyGroups; + this.keyGroupRange = checkNotNull(keyGroupRange); + this.keyContext = checkNotNull(keyContext); + this.processingTimeService = checkNotNull(processingTimeService); + } /** Returns the current processing time. */ - long currentProcessingTime(); + public long currentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } /** Returns the current event-time watermark. */ - long currentWatermark(); + public long currentWatermark() { + return currentWatermark; + } /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerProcessingTimeTimer(N namespace, long time); + abstract public void registerProcessingTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace. */ - void deleteProcessingTimeTimer(N namespace, long time); + abstract public void deleteProcessingTimeTimer(N namespace, long time); /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerEventTimeTimer(N namespace, long time); + abstract public void registerEventTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace.
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106670841 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -18,43 +18,306 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Interface for working with time and timers. * * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} * that allows to specify a key and a namespace to which timers should be scoped. * + * All d + * + * @param Type of the keys in the stream * @param Type of the namespace to which timers are scoped. */ @Internal -public interface InternalTimerService { +public abstract class InternalTimerServiceimplements ProcessingTimeCallback, EventTimeCallback { + + protected final ProcessingTimeService processingTimeService; + + protected final KeyContext keyContext; + + protected final int totalKeyGroups; + + protected final KeyGroupRange keyGroupRange; + + /** +* The one and only Future (if any) registered to execute the +* next {@link Triggerable} action, when its (processing) time arrives. +*/ + protected ScheduledFuture nextTimer; + + /** +* The local event time, as denoted by the last received +* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}. +*/ + private long currentWatermark = Long.MIN_VALUE; + + // Variables to be set when the service is started. + + protected TypeSerializer keySerializer; + + protected TypeSerializer namespaceSerializer; + + private InternalTimer.TimerSerializer timerSerializer; + + protected Triggerable triggerTarget; + + private volatile boolean isInitialized; + + public InternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + + this.totalKeyGroups = totalKeyGroups; + this.keyGroupRange = checkNotNull(keyGroupRange); + this.keyContext = checkNotNull(keyContext); + this.processingTimeService = checkNotNull(processingTimeService); + } /** Returns the current processing time. */ - long currentProcessingTime(); + public long currentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } /** Returns the current event-time watermark. */ - long currentWatermark(); + public long currentWatermark() { + return currentWatermark; + } /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerProcessingTimeTimer(N namespace, long time); + abstract public void registerProcessingTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace. */ - void deleteProcessingTimeTimer(N namespace, long time); + abstract public void deleteProcessingTimeTimer(N namespace, long time); /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerEventTimeTimer(N namespace, long time); + abstract public void registerEventTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace.
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106675685 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106671203 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -18,43 +18,306 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Interface for working with time and timers. * * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} * that allows to specify a key and a namespace to which timers should be scoped. * + * All d + * + * @param Type of the keys in the stream * @param Type of the namespace to which timers are scoped. */ @Internal -public interface InternalTimerService { +public abstract class InternalTimerServiceimplements ProcessingTimeCallback, EventTimeCallback { + + protected final ProcessingTimeService processingTimeService; + + protected final KeyContext keyContext; + + protected final int totalKeyGroups; + + protected final KeyGroupRange keyGroupRange; + + /** +* The one and only Future (if any) registered to execute the +* next {@link Triggerable} action, when its (processing) time arrives. +*/ + protected ScheduledFuture nextTimer; + + /** +* The local event time, as denoted by the last received +* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}. +*/ + private long currentWatermark = Long.MIN_VALUE; + + // Variables to be set when the service is started. + + protected TypeSerializer keySerializer; + + protected TypeSerializer namespaceSerializer; + + private InternalTimer.TimerSerializer timerSerializer; + + protected Triggerable triggerTarget; + + private volatile boolean isInitialized; + + public InternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + + this.totalKeyGroups = totalKeyGroups; + this.keyGroupRange = checkNotNull(keyGroupRange); + this.keyContext = checkNotNull(keyContext); + this.processingTimeService = checkNotNull(processingTimeService); + } /** Returns the current processing time. */ - long currentProcessingTime(); + public long currentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } /** Returns the current event-time watermark. */ - long currentWatermark(); + public long currentWatermark() { + return currentWatermark; + } /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerProcessingTimeTimer(N namespace, long time); + abstract public void registerProcessingTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace. */ - void deleteProcessingTimeTimer(N namespace, long time); + abstract public void deleteProcessingTimeTimer(N namespace, long time); /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerEventTimeTimer(N namespace, long time); + abstract public void registerEventTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace.
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106674528 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106673233 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); --- End diff -- I think this should be initialized as `KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106675210 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106680293 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106669225 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -18,43 +18,306 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Interface for working with time and timers. * * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} * that allows to specify a key and a namespace to which timers should be scoped. * + * All d --- End diff -- Unfinished comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106674160 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106674867 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106670283 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -18,43 +18,306 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Interface for working with time and timers. * * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} * that allows to specify a key and a namespace to which timers should be scoped. * + * All d + * + * @param Type of the keys in the stream * @param Type of the namespace to which timers are scoped. */ @Internal -public interface InternalTimerService { +public abstract class InternalTimerServiceimplements ProcessingTimeCallback, EventTimeCallback { + + protected final ProcessingTimeService processingTimeService; + + protected final KeyContext keyContext; + + protected final int totalKeyGroups; + + protected final KeyGroupRange keyGroupRange; + + /** +* The one and only Future (if any) registered to execute the +* next {@link Triggerable} action, when its (processing) time arrives. +*/ + protected ScheduledFuture nextTimer; + + /** +* The local event time, as denoted by the last received +* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}. +*/ + private long currentWatermark = Long.MIN_VALUE; + + // Variables to be set when the service is started. + + protected TypeSerializer keySerializer; + + protected TypeSerializer namespaceSerializer; + + private InternalTimer.TimerSerializer timerSerializer; + + protected Triggerable triggerTarget; + + private volatile boolean isInitialized; + + public InternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + + this.totalKeyGroups = totalKeyGroups; + this.keyGroupRange = checkNotNull(keyGroupRange); + this.keyContext = checkNotNull(keyContext); + this.processingTimeService = checkNotNull(processingTimeService); + } /** Returns the current processing time. */ - long currentProcessingTime(); + public long currentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } /** Returns the current event-time watermark. */ - long currentWatermark(); + public long currentWatermark() { + return currentWatermark; + } /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerProcessingTimeTimer(N namespace, long time); + abstract public void registerProcessingTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace. */ - void deleteProcessingTimeTimer(N namespace, long time); + abstract public void deleteProcessingTimeTimer(N namespace, long time); /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerEventTimeTimer(N namespace, long time); + abstract public void registerEventTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace.
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106669974 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -18,43 +18,306 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Interface for working with time and timers. * * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} * that allows to specify a key and a namespace to which timers should be scoped. * + * All d + * + * @param Type of the keys in the stream * @param Type of the namespace to which timers are scoped. */ @Internal -public interface InternalTimerService { +public abstract class InternalTimerServiceimplements ProcessingTimeCallback, EventTimeCallback { --- End diff -- I would suggest to still keep the old interface and rename this to `AbstractInternalTimerService implements InternalTimerService`. Like that, we don't need to introduce the generic parameter K in all places, which is actually giving away some implementation detail (K is used only for a member, not for the interface methods). I also like to keep the interface slim, and probably not every code that deals with `InternalTimerService` has to see all the methods, e.g. for snapshots. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106677685 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() +
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106677990 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException { } /** +* Deletes the file associated with the given job and key if it exists in the local +* storage of the blob server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +*/ + @Override + public void delete(JobID jobId, String key) { + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { + if (!localFile.delete()) { + LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); + } + } + + blobStore.delete(jobId, key); + } + + /** +* Deletes all files associated with the given job id from the storage. +* +* @param jobId JobID of the files in the blob store +*/ + @Override + public void deleteAll(final JobID jobId) { + checkArgument(jobId != null, "Job id must not be null."); + + try { + BlobUtils.deleteJobDirectory(storageDir, jobId); + } catch (IOException e) { --- End diff -- If we want to make sure we cleanup in any case, we can actually catch `Exception` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674837 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** +* Returns the URL for the BLOB with the given parameters. The method will first attempt to +* serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try +* to download it from this cache's BLOB server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +* @return URL referring to the local storage location of the BLOB. +* @throws java.io.FileNotFoundException if the path does not exist; +* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception - // it would be replaced by any exception thrown in the finally block - IOUtils.closeQuietly(os); - IOUtils.closeQuietly(is); -
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106675058 --- Diff: docs/setup/config.md --- @@ -494,13 +494,13 @@ Previously this key was named `recovery.mode` and the default value was `standal - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`. -- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`. +- `high-availability.cluster-id`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace` and `high-availability.zookeeper.path.namespace`. --- End diff -- I would move these into ` ### High Availability (HA)` section, because they are independent of ZooKeeper --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106680476 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1305,6 +1305,9 @@ class TaskManager( s"${task.getExecutionState} to JobManager for task ${task.getTaskInfo.getTaskName} " + s"(${task.getExecutionId})") + // delete all NAME_ADDRESSABLE BLOBs + libraryCacheManager.get.getBlobService.deleteAll(task.getJobID) --- End diff -- Multiple tasks of the same job run in a TaskManager. This means that tasks delete each others blobs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106677799 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException { } /** +* Deletes the file associated with the given job and key if it exists in the local +* storage of the blob server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +*/ + @Override + public void delete(JobID jobId, String key) { + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { --- End diff -- From concurrency safety, it better to do `if (!delete && exists)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930220#comment-15930220 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106685158 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java --- @@ -48,15 +49,25 @@ * output selection). */ private final List selectedNames; + + /** +* The side-output tag (if any) of this {@link StreamEdge}. +*/ + private final OutputTag outputTag; + + /** +* The {@link StreamPartitioner} on this {@link StreamEdge}. +*/ private StreamPartitioner outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, - List selectedNames, StreamPartitioner outputPartitioner) { + List selectedNames, StreamPartitioner outputPartitioner, OutputTag outputTag) { this.sourceVertex = sourceVertex; this.targetVertex = targetVertex; this.typeNumber = typeNumber; this.selectedNames = selectedNames; this.outputPartitioner = outputPartitioner; + this.outputTag = outputTag; --- End diff -- Not sure what the edge id exactly does and who uses it so I prefer to not touch it, for now. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6027) Ignore the exception thrown by the subsuming of old completed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-6027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930221#comment-15930221 ] ASF GitHub Bot commented on FLINK-6027: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3521 +1, merging this... > Ignore the exception thrown by the subsuming of old completed checkpoints > - > > Key: FLINK-6027 > URL: https://issues.apache.org/jira/browse/FLINK-6027 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > When a checkpoint is added into the {{CompletedCheckpointStore}} via the > method {{addCheckpoint()}}, the oldest checkpoints will be removed from the > store if the number of stored checkpoints exceeds the given limit. The > subsuming of old checkpoints may fail and make {{addCheckpoint()}} throw > exceptions which are caught by {{CheckpointCoordinator}}. Finally, the states > in the new checkpoint will be deleted by {{CheckpointCoordinator}}. Because > the new checkpoint is still in the store, we may recover the job from the new > checkpoint. But the recovery will fail as the states of the checkpoint are > all deleted. > We should ignore the exceptions thrown by the subsuming of old checkpoints > because we can always recover from the new checkpoint when successfully > adding it into the store. The ignorance may produce some dirty data, but it's > acceptable because they can be cleaned with the cleanup hook introduced in > the near future. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3521: [FLINK-6027][checkpoint] Ignore the exception thrown by t...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3521 +1, merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4364) Implement heartbeat logic between TaskManager and JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930219#comment-15930219 ] ASF GitHub Bot commented on FLINK-4364: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3151 > Implement heartbeat logic between TaskManager and JobManager > > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhijiang >Assignee: zhijiang > > It is part of work for FLIP-6. > The {{HeartbeatManager}} is mainly used for monitoring heartbeat target and > reporting payloads. > For {{JobManager}} side, it would trigger monitoring the {{HeartbeatTarget}} > when receive registration from {{TaskManager}}, and schedule a task to > {{requestHeartbeat}} at interval time. If not receive heartbeat response > within duration time, the {{HeartbeatListener}} will notify heartbeat > timeout, then the {{JobManager}} should remove the internal registered > {{TaskManager}}. > For {{TaskManger}} side, it would trigger monitoring the {{HeartbeatTarget}} > when receive registration acknowledgement from {{JobManager}}. An it will > also be notified heartbeat timeout if not receive heartbeat request from > {{JobManager}} within duration time. > The current implementation will not interact payloads via heartbeat, and it > can be added if needed future. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-4364) Implement heartbeat logic between TaskManager and JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-4364. -- Resolution: Fixed Added via 0b3d5c27f4ab7b2dffb37160a1f01cb822bb696e > Implement heartbeat logic between TaskManager and JobManager > > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhijiang >Assignee: zhijiang > > It is part of work for FLIP-6. > The {{HeartbeatManager}} is mainly used for monitoring heartbeat target and > reporting payloads. > For {{JobManager}} side, it would trigger monitoring the {{HeartbeatTarget}} > when receive registration from {{TaskManager}}, and schedule a task to > {{requestHeartbeat}} at interval time. If not receive heartbeat response > within duration time, the {{HeartbeatListener}} will notify heartbeat > timeout, then the {{JobManager}} should remove the internal registered > {{TaskManager}}. > For {{TaskManger}} side, it would trigger monitoring the {{HeartbeatTarget}} > when receive registration acknowledgement from {{JobManager}}. An it will > also be notified heartbeat timeout if not receive heartbeat request from > {{JobManager}} within duration time. > The current implementation will not interact payloads via heartbeat, and it > can be added if needed future. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskMana...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3151 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930214#comment-15930214 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684413 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- That is what I remember as well. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930207#comment-15930207 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106675058 --- Diff: docs/setup/config.md --- @@ -494,13 +494,13 @@ Previously this key was named `recovery.mode` and the default value was `standal - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`. -- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`. +- `high-availability.cluster-id`: (Default `/default_ns` in standalone cluster mode, or the under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace` and `high-availability.zookeeper.path.namespace`. --- End diff -- I would move these into ` ### High Availability (HA)` section, because they are independent of ZooKeeper > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930211#comment-15930211 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106680476 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1305,6 +1305,9 @@ class TaskManager( s"${task.getExecutionState} to JobManager for task ${task.getTaskInfo.getTaskName} " + s"(${task.getExecutionId})") + // delete all NAME_ADDRESSABLE BLOBs + libraryCacheManager.get.getBlobService.deleteAll(task.getJobID) --- End diff -- Multiple tasks of the same job run in a TaskManager. This means that tasks delete each others blobs. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930209#comment-15930209 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674262 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** +* Returns the URL for the BLOB with the given parameters. The method will first attempt to +* serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try +* to download it from this cache's BLOB server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +* @return URL referring to the local storage location of the BLOB. +* @throws java.io.FileNotFoundException if the path does not exist; +* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception -
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684413 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- That is what I remember as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930210#comment-15930210 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106674837 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // fallback: download from the BlobServer final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); // loop over retries int attempt = 0; while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.get(requiredBlob); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + getURLTransferFile(buf, is, os); + + // success, we finished + return localJarFile.toURI().toURL(); + } + catch (Throwable t) { + getURLOnException(requiredBlob.toString(), localJarFile, attempt, t); - if (attempt == 0) { - LOG.info("Downloading {} from {}", requiredBlob, serverAddress); - } else { + // retry + ++attempt; LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); } + } // end loop over retries + } - try { - BlobClient bc = null; - InputStream is = null; - OutputStream os = null; + /** +* Returns the URL for the BLOB with the given parameters. The method will first attempt to +* serve the BLOB from its local cache. If the BLOB is not in the cache, the method will try +* to download it from this cache's BLOB server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +* @return URL referring to the local storage location of the BLOB. +* @throws java.io.FileNotFoundException if the path does not exist; +* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + public URL getURL(final JobID jobId, final String key) throws IOException { + checkArgument(jobId != null, "Job id cannot be null."); + checkArgument(key != null, "BLOB name cannot be null."); - try { - bc = new BlobClient(serverAddress, blobClientConfig); - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); - - while (true) { - final int read = is.read(buf); - if (read < 0) { - break; - } - os.write(buf, 0, read); - } - - // we do explicitly not use a finally block, because we want the closing - // in the regular case to throw exceptions and cause the writing to fail. - // But, the closing on exception should not throw further exceptions and - // let us keep the root exception - os.close(); - os = null; - is.close(); - is = null; - bc.close(); - bc = null; - - // success, we finished - return localJarFile.toURI().toURL(); - } - catch (Throwable t) { - // we use "catch (Throwable)" to keep the root exception. Otherwise that exception -
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930206#comment-15930206 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106677990 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException { } /** +* Deletes the file associated with the given job and key if it exists in the local +* storage of the blob server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +*/ + @Override + public void delete(JobID jobId, String key) { + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { + if (!localFile.delete()) { + LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); + } + } + + blobStore.delete(jobId, key); + } + + /** +* Deletes all files associated with the given job id from the storage. +* +* @param jobId JobID of the files in the blob store +*/ + @Override + public void deleteAll(final JobID jobId) { + checkArgument(jobId != null, "Job id must not be null."); + + try { + BlobUtils.deleteJobDirectory(storageDir, jobId); + } catch (IOException e) { --- End diff -- If we want to make sure we cleanup in any case, we can actually catch `Exception` here. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930208#comment-15930208 ] ASF GitHub Bot commented on FLINK-6008: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3512#discussion_r106677799 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException { } /** +* Deletes the file associated with the given job and key if it exists in the local +* storage of the blob server. +* +* @param jobId JobID of the file in the blob store +* @param key String key of the file in the blob store +*/ + @Override + public void delete(JobID jobId, String key) { + checkArgument(jobId != null, "Job id must not be null."); + checkArgument(key != null, "BLOB name must not be null."); + + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, key); + + if (localFile.exists()) { --- End diff -- From concurrency safety, it better to do `if (!delete && exists)` > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930204#comment-15930204 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684096 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- I must have removed the check by accident. I think we agreed to rename this to something more meaningful and keep it, right? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684096 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- I must have removed the check by accident. I think we agreed to rename this to something more meaningful and keep it, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930168#comment-15930168 ] ASF GitHub Bot commented on FLINK-5544: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106680293 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** +* The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each +* partition's leader is stored in the heap. When the timers in a partition is changed, we +* will change the partition's leader and update the heap accordingly. +*/ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { +
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930169#comment-15930169 ] ASF GitHub Bot commented on FLINK-5544: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106672829 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerServiceextends InternalTimerService { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; --- End diff -- I think that we should avoid creating more instances of RocksDB if we can. This makes native memory consumption more unpredictable and creates more files on snapshots. My suggestion is do do a refactoring that timer services must be requested through a keyed state backend. The RocksDB backend could then re-use the same database instance as in the keyed backend for the timer service, to reduce all the overheads. I think this request should still allow for asking a RocksDB based timer service, even though using a `HeapKeyedStateBackend` and vice-versa. > Implement Internal Timer Service in RocksDB > --- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in
[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106668768 --- Diff: flink-contrib/flink-timerserivce-rocksdb/pom.xml --- @@ -0,0 +1,80 @@ + --- End diff -- I think we should simply integrate the RocksDB timer service in the project flink-statebackend-rocksdb. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930193#comment-15930193 ] ASF GitHub Bot commented on FLINK-5544: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3359 One additional comment, also as reminder for @aljoscha and me: after this PR is rebased, we have access to `InternalKeyContext`, which should be somehow integrated with the already existing `KeyContext` interface. This would reduce several constructor parameters and some members, e.g. the `KeyGroupRange`s and `TypeSerializer`s. > Implement Internal Timer Service in RocksDB > --- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3359 One additional comment, also as reminder for @aljoscha and me: after this PR is rebased, we have access to `InternalKeyContext`, which should be somehow integrated with the already existing `KeyContext` interface. This would reduce several constructor parameters and some members, e.g. the `KeyGroupRange`s and `TypeSerializer`s. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930179#comment-15930179 ] ASF GitHub Bot commented on FLINK-5544: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106670490 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -18,43 +18,306 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Interface for working with time and timers. * * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} * that allows to specify a key and a namespace to which timers should be scoped. * + * All d + * + * @param Type of the keys in the stream * @param Type of the namespace to which timers are scoped. */ @Internal -public interface InternalTimerService { +public abstract class InternalTimerServiceimplements ProcessingTimeCallback, EventTimeCallback { + + protected final ProcessingTimeService processingTimeService; + + protected final KeyContext keyContext; + + protected final int totalKeyGroups; + + protected final KeyGroupRange keyGroupRange; + + /** +* The one and only Future (if any) registered to execute the +* next {@link Triggerable} action, when its (processing) time arrives. +*/ + protected ScheduledFuture nextTimer; + + /** +* The local event time, as denoted by the last received +* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}. +*/ + private long currentWatermark = Long.MIN_VALUE; + + // Variables to be set when the service is started. + + protected TypeSerializer keySerializer; + + protected TypeSerializer namespaceSerializer; + + private InternalTimer.TimerSerializer timerSerializer; + + protected Triggerable triggerTarget; + + private volatile boolean isInitialized; + + public InternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + + this.totalKeyGroups = totalKeyGroups; + this.keyGroupRange = checkNotNull(keyGroupRange); + this.keyContext = checkNotNull(keyContext); + this.processingTimeService = checkNotNull(processingTimeService); + } /** Returns the current processing time. */ - long currentProcessingTime(); + public long currentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } /** Returns the current event-time watermark. */ - long currentWatermark(); + public long currentWatermark() { + return currentWatermark; + } /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerProcessingTimeTimer(N namespace, long time); + abstract public void registerProcessingTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace. */ - void deleteProcessingTimeTimer(N namespace, long time); + abstract public void deleteProcessingTimeTimer(N namespace, long time); /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer
[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930174#comment-15930174 ] ASF GitHub Bot commented on FLINK-5544: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106671203 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -18,43 +18,306 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Interface for working with time and timers. * * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} * that allows to specify a key and a namespace to which timers should be scoped. * + * All d + * + * @param Type of the keys in the stream * @param Type of the namespace to which timers are scoped. */ @Internal -public interface InternalTimerService { +public abstract class InternalTimerServiceimplements ProcessingTimeCallback, EventTimeCallback { + + protected final ProcessingTimeService processingTimeService; + + protected final KeyContext keyContext; + + protected final int totalKeyGroups; + + protected final KeyGroupRange keyGroupRange; + + /** +* The one and only Future (if any) registered to execute the +* next {@link Triggerable} action, when its (processing) time arrives. +*/ + protected ScheduledFuture nextTimer; + + /** +* The local event time, as denoted by the last received +* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}. +*/ + private long currentWatermark = Long.MIN_VALUE; + + // Variables to be set when the service is started. + + protected TypeSerializer keySerializer; + + protected TypeSerializer namespaceSerializer; + + private InternalTimer.TimerSerializer timerSerializer; + + protected Triggerable triggerTarget; + + private volatile boolean isInitialized; + + public InternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + + this.totalKeyGroups = totalKeyGroups; + this.keyGroupRange = checkNotNull(keyGroupRange); + this.keyContext = checkNotNull(keyContext); + this.processingTimeService = checkNotNull(processingTimeService); + } /** Returns the current processing time. */ - long currentProcessingTime(); + public long currentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } /** Returns the current event-time watermark. */ - long currentWatermark(); + public long currentWatermark() { + return currentWatermark; + } /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerProcessingTimeTimer(N namespace, long time); + abstract public void registerProcessingTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace. */ - void deleteProcessingTimeTimer(N namespace, long time); + abstract public void deleteProcessingTimeTimer(N namespace, long time); /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer