[GitHub] [flink] flinkbot edited a comment on pull request #13697: [FLINK-19357][FLINK-19357][fs-connector] Introduce createBucketWriter to BucketsBuilder & Introduce FileLifeCycleListener to Buckets
flinkbot edited a comment on pull request #13697: URL: https://github.com/apache/flink/pull/13697#issuecomment-712672697 ## CI report: * bd25177243a782fad2e7f9d9ff508e6ba3758303 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8490) * 48c716cf4d460cd40d7f87eed79386a09d55ed5f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19870) Fix special case when the reuse of exchange causes the deadlock
[ https://issues.apache.org/jira/browse/FLINK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng updated FLINK-19870: Description: Currently the reuse of exchange is not considered to be a deadlock because although the exec node of an exchange is reused, its underlying transformation is not reused. However if this behavior changes a deadlock may occur. For example, consider the following SQL and its plan: {code:sql} WITH T1 AS (SELECT a FROM x) SELECT * FROM T1 INNER JOIN T1 AS T2 ON T1.a = T2.a {code} {code} HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], build=[right]) :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH], reuse_id=[1]) : +- Calc(select=[a]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Reused(reference_id=[1]) {code} The reuse of exchange may cause a deadlock on hash join. was: Currently the reuse of exchange is not considered to be a deadlock because although the exec node of an exchange is reused, its underlying transformation is not reused. However if this behavior changes a deadlock may occur. For example, consider the following SQL and its plan: {code:sql} WITH T1 AS (SELECT a FROM x) SELECT * FROM T1 INNER JOIN T1 AS T2 ON T1.a = T2.a {code} {code} HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], build=[right]) :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH], reuse_id=[1]) : +- Calc(select=[a]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Reused(reference_id=[1]) {code} The reuse of exchange may cause a deadlock on hash join. > Fix special case when the reuse of exchange causes the deadlock > --- > > Key: FLINK-19870 > URL: https://issues.apache.org/jira/browse/FLINK-19870 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Caizhi Weng >Priority: Major > Fix For: 1.12.0 > > > Currently the reuse of exchange is not considered to be a deadlock because > although the exec node of an exchange is reused, its underlying > transformation is not reused. However if this behavior changes a deadlock may > occur. > For example, consider the following SQL and its plan: > {code:sql} > WITH T1 AS (SELECT a FROM x) > SELECT * FROM T1 > INNER JOIN T1 AS T2 ON T1.a = T2.a > {code} > {code} > HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], > build=[right]) > :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH], reuse_id=[1]) > : +- Calc(select=[a]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Reused(reference_id=[1]) > {code} > The reuse of exchange may cause a deadlock on hash join. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19870) Fix special case when the reuse of exchange causes the deadlock
Caizhi Weng created FLINK-19870: --- Summary: Fix special case when the reuse of exchange causes the deadlock Key: FLINK-19870 URL: https://issues.apache.org/jira/browse/FLINK-19870 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Caizhi Weng Fix For: 1.12.0 Currently the reuse of exchange is not considered to be a deadlock because although the exec node of an exchange is reused, its underlying transformation is not reused. However if this behavior changes a deadlock may occur. For example, consider the following SQL and its plan: {code:sql} WITH T1 AS (SELECT a FROM x) SELECT * FROM T1 INNER JOIN T1 AS T2 ON T1.a = T2.a {code} {code} HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], build=[right]) :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH], reuse_id=[1]) : +- Calc(select=[a]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Reused(reference_id=[1]) {code} The reuse of exchange may cause a deadlock on hash join. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19869) Support java.util.UUID as RAW type in PostgresCatalog and handle it in PostgresRowConverter
Jingwei Zhang created FLINK-19869: - Summary: Support java.util.UUID as RAW type in PostgresCatalog and handle it in PostgresRowConverter Key: FLINK-19869 URL: https://issues.apache.org/jira/browse/FLINK-19869 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: 1.11.2 Reporter: Jingwei Zhang Problem: UUID is not a SQL standard type. But it is supported by Postgres as an internal type which postgres client will help to convert to java.util.UUID when queried. However, it seems flink doesn't support it as a sql extension in PostgresCatalog. There is no uuid serializer provided for it, either. So if I want to filter a result set and put the filter result back to a table with same schema with uuid column type in it. I will have problem. Proposal: Handle UUID in postgres as a SQL extension. Use the DataTypes.RAW to wrap its original class info and its serializer so as to expose it as a RAW JDBCType(only in postgres) in PostgresCatalog. And meanwhile, for PostgresRowConverter, we could use BinaryRawValueData to handle those SQL extension for byte serialization/deserialization. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19867) Validation fails for UDF that accepts var-args
[ https://issues.apache.org/jira/browse/FLINK-19867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Li updated FLINK-19867: --- Fix Version/s: 1.12.0 > Validation fails for UDF that accepts var-args > -- > > Key: FLINK-19867 > URL: https://issues.apache.org/jira/browse/FLINK-19867 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Rui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13641: [WIP][FLINK-17760][tests] Rework tests to not rely on legacy scheduling logics in ExecutionGraph anymore
flinkbot edited a comment on pull request #13641: URL: https://github.com/apache/flink/pull/13641#issuecomment-708569491 ## CI report: * 09d8deb89416f53dfe8b5c16fb9d723cbd98612c UNKNOWN * 9ea4cf7454f9b8e0c237ba8f540b224abdf9f7b0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8279) * 659fb7eddb0acfa0ef49f76c5fafca21c389f3c0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8562) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13833: [FLINK-19867][table-common] Validation fails for UDF that accepts var…
flinkbot edited a comment on pull request #13833: URL: https://github.com/apache/flink/pull/13833#issuecomment-718355139 ## CI report: * 4bec3f4f3950b243ef9f87f24ef481c5e73ceb17 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8560) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13641: [WIP][FLINK-17760][tests] Rework tests to not rely on legacy scheduling logics in ExecutionGraph anymore
flinkbot edited a comment on pull request #13641: URL: https://github.com/apache/flink/pull/13641#issuecomment-708569491 ## CI report: * 09d8deb89416f53dfe8b5c16fb9d723cbd98612c UNKNOWN * 9ea4cf7454f9b8e0c237ba8f540b224abdf9f7b0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8279) * 659fb7eddb0acfa0ef49f76c5fafca21c389f3c0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13073: [FLINK-18820] SourceOperator should send MAX_WATERMARK to downstream operator when closed
flinkbot edited a comment on pull request #13073: URL: https://github.com/apache/flink/pull/13073#issuecomment-669650373 ## CI report: * 406f3764484bb5b004988002f620bd20c0c34fd6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8494) * c7735c9b50701590dad4c52734ccae445419e4d6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8559) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong merged pull request #13809: [Hotfix][json] Add serialVersionUID to JsonInputFormat class
wuchong merged pull request #13809: URL: https://github.com/apache/flink/pull/13809 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13833: [FLINK-19867][table-common] Validation fails for UDF that accepts var…
flinkbot commented on pull request #13833: URL: https://github.com/apache/flink/pull/13833#issuecomment-718355139 ## CI report: * 4bec3f4f3950b243ef9f87f24ef481c5e73ceb17 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004 Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the reason. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type. **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.** Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice. **2) Differences between these two types;** For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types. - In normal pipeline mode, for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release. - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition. - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified) - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later). > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off. This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once. > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state). I totally agree. Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)` is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly. Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed.
[GitHub] [flink] flinkbot edited a comment on pull request #13784: [FLINK-19698][connectors/common] API improvements to the Sources.
flinkbot edited a comment on pull request #13784: URL: https://github.com/apache/flink/pull/13784#issuecomment-716153889 ## CI report: * f0dec4fa5d8e69761377a45df57106a4fbfe8152 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8496) * 83636dcd8f6dc5d16671e18c9cebe39cfc822056 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8557) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
flinkbot edited a comment on pull request #13763: URL: https://github.com/apache/flink/pull/13763#issuecomment-715195599 ## CI report: * 4c5a06b1ca2833fe7f63c25503660ed7acf9b77d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8503) * 13712c13da124c38a430d4cc50ad28aec4318764 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8556) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13685: [FLINK-19702][hive] Avoid using HiveConf::hiveSiteURL
flinkbot edited a comment on pull request #13685: URL: https://github.com/apache/flink/pull/13685#issuecomment-711940021 ## CI report: * b49b477343603ef7568ec9c1de8c59807d85d232 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8516) * 5c973d952e126afd81b88646af666713e30835f0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8555) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur edited a comment on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004 Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the reason. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 1. Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type. **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.** Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice. **2) Differences between these two types;** For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types. - In normal pipeline mode, for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release. - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition. - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified) - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later). > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off. This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once. > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state). I totally agree. Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)` is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly. Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is
[GitHub] [flink] curcur commented on pull request #13648: [FLINK-19632] Introduce a new ResultPartitionType for Approximate Local Recovery
curcur commented on pull request #13648: URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004 Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly explained why to bring a new ResultPartitionType, so I think here is a good place to discuss the reason. I will summarize the thoughts we've discussed offline, and some other considerations from my perspective after our discussion. cc @pnowojski > I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions? 1. Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type. **1) The first and most important reason is isolating changes to avoid affecting the normal execution path.** Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation. Even though conceptually yes, having **different implementation subclasses for different connection behavior** does seem reasonable. It simplifies the logic for different behavior. **So personally, I am leaning not to unifty them**. But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice. **2) Differences between these two types;** For upstream-reconnection, there are mainly two differences: **read** and **release** upon these two types. - In normal pipeline mode, for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release. - In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition. - for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified) - for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later). > If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off. This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once. > As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state). I totally agree. Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is “binding” to the consumer task, not very intuitive but reasonable. Because `PIPELINED(_BOUNDED)` is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly. Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I
[GitHub] [flink] flinkbot edited a comment on pull request #13073: [FLINK-18820] SourceOperator should send MAX_WATERMARK to downstream operator when closed
flinkbot edited a comment on pull request #13073: URL: https://github.com/apache/flink/pull/13073#issuecomment-669650373 ## CI report: * 406f3764484bb5b004988002f620bd20c0c34fd6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8494) * c7735c9b50701590dad4c52734ccae445419e4d6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513954139 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/OutputFormatProvider.java ## @@ -20,13 +20,15 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; /** * Provider of an {@link OutputFormat} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface OutputFormatProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface OutputFormatProvider Review comment: Code format: It is ok to include in one line ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -79,6 +87,10 @@ class CommonPhysicalSink ( val enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames) runtimeProvider match { + case _: DataStreamSinkProvider with ParallelismProvider => throw new TableException( Review comment: Code format: Better to break line when `throw new Table...` ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + +val inputParallelism = inputTransformation.getParallelism +val parallelism = { + val parallelismOptional = runtimeProvider +.asInstanceOf[ParallelismProvider].getParallelism + if(parallelismOptional.isPresent) { +val parallelismPassedIn = parallelismOptional.get().intValue() +if(parallelismPassedIn <= 0) { + throw new TableException( +s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " + + "should not be less than zero or equal to zero") +} +parallelismPassedIn + } else inputParallelism +} + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val theFinalInputTransformation = + (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match { + // if the inputParallelism equals parallelism, do nothing. + case (true, _, _) => inputTransformation + case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation + case (_, _, Nil) => +throw new TableException( +s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " + +s"while the input parallelism is: $inputParallelism. " + +s"Since the changelog mode " + +s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " + +s"which is not INSERT_ONLY mode, " + +s"primary key is required but no primary key is found" + ) + case (_, _, pks) => +//key by before sink +//according to [[StreamExecExchange]] +val selector = KeySelectorUtil.getRowDataSelector( + pks.toArray, inputTypeInfo) +// in case of maxParallelism is negative +val keyGroupNum = env.getMaxParallelism match { Review comment: Why need to check `env.getMaxParallelism`? ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + +val inputParallelism = inputTransformation.getParallelism +val parallelism = { + val parallelismOptional = runtimeProvider +.asInstanceOf[ParallelismProvider].getParallelism + if(parallelismOptional.isPresent) { +val parallelismPassedIn = parallelismOptional.get().intValue() +if(parallelismPassedIn <= 0) { + throw new TableException( +s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " + + "should not be less than zero or equal to zero") +} +parallelismPassedIn + } else inputParallelism +} + +
[jira] [Closed] (FLINK-19587) Error result when casting binary type as varchar
[ https://issues.apache.org/jira/browse/FLINK-19587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-19587. --- Resolution: Fixed > Error result when casting binary type as varchar > > > Key: FLINK-19587 > URL: https://issues.apache.org/jira/browse/FLINK-19587 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: hailong wang >Assignee: hailong wang >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > The result is error when casting binary type as varchar type. > For example, > {code:java} > @Test > def testCast1(): Unit = { > testSqlApi( > "CAST(X'68656c6c6f' as varchar)", > "hello") > } > {code} > The result is > {code:java} > Expected :hello > Actual :[B@57fae983 > {code} > It is right as follow, > {code:java} > @Test > def testCast(): Unit = { > testSqlApi( > "CAST(CAST(X'68656c6c6f' as varbinary) as varchar)", > "hello") > } > {code} > We just need to change > {code:java} > case (VARBINARY, VARCHAR | CHAR){code} > to > {code:java} > case (BINARY | VARBINARY, VARCHAR | CHAR) > {code} > in ScalarOperatorGens#generateCast. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19587) Error result when casting binary type as varchar
[ https://issues.apache.org/jira/browse/FLINK-19587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221965#comment-17221965 ] Jark Wu edited comment on FLINK-19587 at 10/29/20, 4:22 AM: Fixed in: - master: d9b0ac97ee4675aebdab1592af663b95fdc5051b - 1.11: 053f03c8ea1c7e1e93d41e87606799f4f844c719 was (Author: jark): Fixed in: - master: d9b0ac97ee4675aebdab1592af663b95fdc5051b - 1.11: TODO > Error result when casting binary type as varchar > > > Key: FLINK-19587 > URL: https://issues.apache.org/jira/browse/FLINK-19587 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: hailong wang >Assignee: hailong wang >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > The result is error when casting binary type as varchar type. > For example, > {code:java} > @Test > def testCast1(): Unit = { > testSqlApi( > "CAST(X'68656c6c6f' as varchar)", > "hello") > } > {code} > The result is > {code:java} > Expected :hello > Actual :[B@57fae983 > {code} > It is right as follow, > {code:java} > @Test > def testCast(): Unit = { > testSqlApi( > "CAST(CAST(X'68656c6c6f' as varbinary) as varchar)", > "hello") > } > {code} > We just need to change > {code:java} > case (VARBINARY, VARCHAR | CHAR){code} > to > {code:java} > case (BINARY | VARBINARY, VARCHAR | CHAR) > {code} > in ScalarOperatorGens#generateCast. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong merged pull request #13826: [BP-1.11][FLINK-19587][table-planner-blink] Fix error result when casting binary as varchar
wuchong merged pull request #13826: URL: https://github.com/apache/flink/pull/13826 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13832: [FLINK-19822][table-planner] Remove redundant shuffle for streaming
flinkbot edited a comment on pull request #13832: URL: https://github.com/apache/flink/pull/13832#issuecomment-718338774 ## CI report: * d32b02c2668093ff2745c4d124b11807f0b3f11c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8554) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19868) Csv Serialization schema contains line delimiter
[ https://issues.apache.org/jira/browse/FLINK-19868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-19868: Fix Version/s: 1.12.0 > Csv Serialization schema contains line delimiter > > > Key: FLINK-19868 > URL: https://issues.apache.org/jira/browse/FLINK-19868 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Jingsong Lee >Priority: Major > Fix For: 1.12.0 > > > CsvRowSerializationSchema.serialize(Row.of("f0", "f1")) => f0,f1\n > Csv Serialization schema is for one line, why contains line delimiter? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19868) Csv Serialization schema contains line delimiter
[ https://issues.apache.org/jira/browse/FLINK-19868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222700#comment-17222700 ] Jark Wu commented on FLINK-19868: - I think {{csv.line-delimiter}} is not a good option for csv format, the {{line-delimiter}} should be an option of filesystem, because it is never used for message queues. For the row-wise formats, it should consume and produce the bytes for a row. The line splitting should be handled by connectors. I also searched the mailing list, and I didn't find any user asking questions about {{csv.line-delimiter}}, only some people are asking {{csv.field-delimiter}}. Therefore, I think we can remove {{csv.line-delimiter}} option (mention this in release note) and not append a new line in {{CsvRowDataSerializationSchema}}. What do you think? > Csv Serialization schema contains line delimiter > > > Key: FLINK-19868 > URL: https://issues.apache.org/jira/browse/FLINK-19868 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Jingsong Lee >Priority: Major > > CsvRowSerializationSchema.serialize(Row.of("f0", "f1")) => f0,f1\n > Csv Serialization schema is for one line, why contains line delimiter? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13784: [FLINK-19698][connectors/common] API improvements to the Sources.
flinkbot edited a comment on pull request #13784: URL: https://github.com/apache/flink/pull/13784#issuecomment-716153889 ## CI report: * f0dec4fa5d8e69761377a45df57106a4fbfe8152 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8496) * 83636dcd8f6dc5d16671e18c9cebe39cfc822056 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
flinkbot edited a comment on pull request #13763: URL: https://github.com/apache/flink/pull/13763#issuecomment-715195599 ## CI report: * 4c5a06b1ca2833fe7f63c25503660ed7acf9b77d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8503) * 13712c13da124c38a430d4cc50ad28aec4318764 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13685: [FLINK-19702][hive] Avoid using HiveConf::hiveSiteURL
flinkbot edited a comment on pull request #13685: URL: https://github.com/apache/flink/pull/13685#issuecomment-711940021 ## CI report: * b49b477343603ef7568ec9c1de8c59807d85d232 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8516) * 5c973d952e126afd81b88646af666713e30835f0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19868) Csv Serialization schema contains line delimiter
[ https://issues.apache.org/jira/browse/FLINK-19868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222697#comment-17222697 ] Jingsong Lee commented on FLINK-19868: -- Now the filesystem wants to reuse serialization schema, but the behavior of each format is inconsistent. For example, the line separator is written in CSV, but JSON does not. > Csv Serialization schema contains line delimiter > > > Key: FLINK-19868 > URL: https://issues.apache.org/jira/browse/FLINK-19868 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Jingsong Lee >Priority: Major > > CsvRowSerializationSchema.serialize(Row.of("f0", "f1")) => f0,f1\n > Csv Serialization schema is for one line, why contains line delimiter? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #13833: [FLINK-19867][table-common] Validation fails for UDF that accepts var…
flinkbot commented on pull request #13833: URL: https://github.com/apache/flink/pull/13833#issuecomment-718346736 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 4bec3f4f3950b243ef9f87f24ef481c5e73ceb17 (Thu Oct 29 04:08:05 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-19867).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19868) Csv Serialization schema contains line delimiter
[ https://issues.apache.org/jira/browse/FLINK-19868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-19868: - Description: CsvRowSerializationSchema.serialize(Row.of("f0", "f1")) => f0,f1\n Csv Serialization schema is for one line, why contains line delimiter? was:Csv Serialization schema is for one line, why contains line delimiter? > Csv Serialization schema contains line delimiter > > > Key: FLINK-19868 > URL: https://issues.apache.org/jira/browse/FLINK-19868 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Jingsong Lee >Priority: Major > > CsvRowSerializationSchema.serialize(Row.of("f0", "f1")) => f0,f1\n > Csv Serialization schema is for one line, why contains line delimiter? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19867) Validation fails for UDF that accepts var-args
[ https://issues.apache.org/jira/browse/FLINK-19867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19867: --- Labels: pull-request-available (was: ) > Validation fails for UDF that accepts var-args > -- > > Key: FLINK-19867 > URL: https://issues.apache.org/jira/browse/FLINK-19867 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Rui Li >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19868) Csv Serialization schema contains line delimiter
Jingsong Lee created FLINK-19868: Summary: Csv Serialization schema contains line delimiter Key: FLINK-19868 URL: https://issues.apache.org/jira/browse/FLINK-19868 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Jingsong Lee Csv Serialization schema is for one line, why contains line delimiter? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lirui-apache opened a new pull request #13833: [FLINK-19867][table-common] Validation fails for UDF that accepts var…
lirui-apache opened a new pull request #13833: URL: https://github.com/apache/flink/pull/13833 …-args ## What is the purpose of the change Fix the failure to extract eval method for functions with var-arg parameter. ## Brief change log - Check var-arg parameter first in `ExtractionUtils.isInvokable` - Add test case ## Verifying this change Added test case ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-19867) Validation fails for UDF that accepts var-args
Rui Li created FLINK-19867: -- Summary: Validation fails for UDF that accepts var-args Key: FLINK-19867 URL: https://issues.apache.org/jira/browse/FLINK-19867 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Rui Li -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Sxnan commented on pull request #13073: [FLINK-18820] SourceOperator should send MAX_WATERMARK to downstream operator when closed
Sxnan commented on pull request #13073: URL: https://github.com/apache/flink/pull/13073#issuecomment-718343253 I have made the change accordingly. @dawidwys Please have another look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-19866) FunctionsStateBootstrapOperator.createStateAccessor fails due to uninitialized runtimeContext
wang created FLINK-19866: Summary: FunctionsStateBootstrapOperator.createStateAccessor fails due to uninitialized runtimeContext Key: FLINK-19866 URL: https://issues.apache.org/jira/browse/FLINK-19866 Project: Flink Issue Type: Bug Components: Stateful Functions Affects Versions: 1.11.2, statefun-2.2.0 Reporter: wang It has bugs similar to [FLINK-19330|https://issues.apache.org/jira/browse/FLINK-19330] In Flink 1.11.2, statefun-flink-state-processor 2.2.0, the AbstractStreamOperator's runtimeContext is not fully initialized when executing AbstractStreamOperator#intializeState() in particular KeyedStateStore is set after intializeState was finished. See: [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L258,L259] This behaviour was changed from Flink 1.10->Flink 1.11. StateFun's FunctionsStateBootstrapOperator performs its initialization logic at initalizeState, and it requires an already initialized runtimeContext to create stateAccessor. This situation causes the following failure: {code:java} Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:223) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:188) at org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateAccessor(FlinkState.java:69) at org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindValue(FlinkStateBinder.java:48) at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:30) at org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:46) at org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapFunctionRegistry.bindState(StateBootstrapFunctionRegistry.java:120) at org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapFunctionRegistry.initialize(StateBootstrapFunctionRegistry.java:103) at org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapper.(StateBootstrapper.java:39) at org.apache.flink.statefun.flink.state.processor.operator.FunctionsStateBootstrapOperator.initializeState(FunctionsStateBootstrapOperator.java:67) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.state.api.output.BoundedStreamTask.init(BoundedStreamTask.java:85) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:457) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19768) The shell "./yarn-session.sh " not use log4j-session.properties , it use log4j.properties
[ https://issues.apache.org/jira/browse/FLINK-19768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222687#comment-17222687 ] YUJIANBO commented on FLINK-19768: -- Thanks a lot for your answer! I changed the log4j.properties parameters "appender.main.fileName = ${sys:log.file}-a", but failed realized the log in the yarn session mode to distinguish logs of different tasks. I've tried the above steps,but I encountered two serious proble: 1, in per-job model, form flink web I see taskmanager side that the web shows "taskmanager.log-a" and taskmanager.err doesn't shows "taskmanager.err-a", but jobmanager side 's Log List has nothing. I'm puzzled why it's like this。 2,in yarn-session model, I have done three steps: (1) "appender.main.fileName = ${sys:log.file}-a" I do the shell "FLINK_CONF_DIR=<> ./yarn-session.sh ..." to submit yarn-session. (2) "appender.main.fileName = ${sys:log.file}-b" I do the shell "FLINK_CONF_DIR=<> ./flink run -yid ..." to task one. (3) "appender.main.fileName = ${sys:log.file}-c" I do the shell "FLINK_CONF_DIR=<> ./flink run -yid ..." to task two. >From flink web, I aslo see the jobmanager side 's Log List has nothing。But I >found something more important: Whether it's task one or task two, the taskmanager side show logs "taskmanager.log-a", not "taskmanager.log-b" or "taskmanager.log-c". > The shell "./yarn-session.sh " not use log4j-session.properties , it use > log4j.properties > -- > > Key: FLINK-19768 > URL: https://issues.apache.org/jira/browse/FLINK-19768 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission, Deployment / YARN >Affects Versions: 1.11.2 >Reporter: YUJIANBO >Priority: Major > > The shell "./yarn-session.sh " not use log4j-session.properties , it use > log4j.properties > My Flink Job UI shows the $internal.yarn.log-config-file is > "/usr/local/flink-1.11.2/conf/log4j.properties",is it a bug? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513927391 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: @dawidwys I have changed the schema row type to be always nullable false, please take a look again if you have time, thanks so much. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513370899 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,17 +299,22 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { - return convertToSchema(logicalType, "record"); + return convertToSchema(logicalType, "record", true); } /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * * @param logicalType logical type * @param rowName the record name +* @param top whether it is parsing the root record, +*if it is, the logical type nullability would be ignored * @return Avro's {@link Schema} matching this logical type. */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { + public static Schema convertToSchema( + LogicalType logicalType, + String rowName, + boolean top) { Review comment: I had offline discussion with @wuchong and @dawidwys , and after some research, we found that a non-nullable row type is more reasonable. But because the change is huge(many codes that convert a type info to data type assumes nullable true before), me and @wuchong decide to change the method signature to `convertToSchema(RowType schema)` and add a notion to the method doc that the passed in `schema` must be the top level record type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.
shuiqiangchen commented on a change in pull request #13803: URL: https://github.com/apache/flink/pull/13803#discussion_r513925523 ## File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py ## @@ -0,0 +1,89 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pyflink.common.serialization import SimpleStringSchema +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic +from pyflink.datastream.connectors import FlinkKafkaProducer +from pyflink.datastream.functions import ProcessFunction, Collector +from pyflink.table import StreamTableEnvironment + + +def test_ds_timer(): +env = StreamExecutionEnvironment.get_execution_environment() +env.set_parallelism(1) Review comment: Set the parallelism to be one to make sure that all data including fired timer and normal data are processed by the same worker and the collected result would be in order which is good for assertion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-19692) Can't restore feedback channel from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-19692. --- Assignee: Igal Shilman Resolution: Fixed Fixed via - statefun/master: ddeec3128524ecb41633c6a325c030c7b72d7e83 statefun/release-2.2: 8823b27defff64fab2532fb01d1b5f8ae5cfdd72 Thanks for the fix [~igal] and also the testing efforts [~Antti-Kaikkonen]! > Can't restore feedback channel from savepoint > - > > Key: FLINK-19692 > URL: https://issues.apache.org/jira/browse/FLINK-19692 > Project: Flink > Issue Type: Bug > Components: Stateful Functions >Affects Versions: statefun-2.0.0, statefun-2.1.0, statefun-2.2.0 >Reporter: Antti Kaikkonen >Assignee: Igal Shilman >Priority: Blocker > Labels: pull-request-available > Fix For: statefun-2.3.0, statefun-2.2.1 > > > When using the new statefun-flink-datastream integration the following error > is thrown by the *feedback -> union* task when trying to restore from a > savepoint: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: java.io.IOException: position out of bounds > at > org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) > ... 9 more > Caused by: java.io.IOException: position out of bounds > at > org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228) > ... 10 more > {code} > The error is only thrown when the feedback channel has been used. > I have tested with the [example > application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java] > and the error is thrown only if it is modified to actually use the feedback > channel. I simply modified the invoke method to sometimes forward the > greeting to a random name: > {code:java} > @Override > public void invoke(Context context, Object input) { > int seen = seenCount.updateAndGet(MyFunction::increment); > context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, > seen)); > String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"}; > ThreadLocalRandom random = ThreadLocalRandom.current(); > int index = random.nextInt(names.length); > final String name2 = names[index]; > if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), > input); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #13832: [FLINK-19822][table-planner] Remove redundant shuffle for streaming
flinkbot commented on pull request #13832: URL: https://github.com/apache/flink/pull/13832#issuecomment-718338774 ## CI report: * d32b02c2668093ff2745c4d124b11807f0b3f11c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13831: [FLINK-19624][fix] Fix special case when the reuse of exchange causes the deadlock
flinkbot edited a comment on pull request #13831: URL: https://github.com/apache/flink/pull/13831#issuecomment-718332223 ## CI report: * f7a377f9a04c5d630cb578ebc9040e936e6a60a4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8553) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13830: [FLINK-14482][rocksdb] Bump FRocksDB verion to base on RocksDB-6.11.6
flinkbot edited a comment on pull request #13830: URL: https://github.com/apache/flink/pull/13830#issuecomment-718331944 ## CI report: * e4cd22945ab2b3e7cb44c302e9db12231f52fdbc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8552) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.
flinkbot edited a comment on pull request #13803: URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782 ## CI report: * 2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498) * a214b00956b850ec3aca2455478353472dd4f44c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13797: [FLINK-19465][runtime / statebackends] Add CheckpointStorage interface and wire through runtime
flinkbot edited a comment on pull request #13797: URL: https://github.com/apache/flink/pull/13797#issuecomment-716854695 ## CI report: * c06b3dcd4eb4045f66352c8b65c14a7b60163ecd UNKNOWN * 185e46fd046c5f39a87ac3583ad559b065a275e1 UNKNOWN * 942af8415dd1a9fbd715532bf6b578a7a4b7f082 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8543) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaoyunhaii commented on a change in pull request #13697: [FLINK-19357][FLINK-19357][fs-connector] Introduce createBucketWriter to BucketsBuilder & Introduce FileLifeCycleListener to B
gaoyunhaii commented on a change in pull request #13697: URL: https://github.com/apache/flink/pull/13697#discussion_r513921052 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/FileLifeCycleListener.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.Path; + +/** + * Listener about the status of file. + */ +@Internal +public interface FileLifeCycleListener { + + /** +* Notifies a new file has been opened. +* +* Note that this does not mean that the file has been created in the file system. It is +* only created logically and the actual file will be generated after it is committed. +* +* @param bucketID The bucketID of newly opened file. +* @param newPath The path of newly opened file. +*/ + void openPartFile(BucketID bucketID, Path newPath); Review comment: I recommend we change the name of this method to `partFileOpened` or `onPartFileOpened`, the current name seems to open the part file actually in this method. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/FileLifeCycleListener.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.Path; + +/** + * Listener about the status of file. + */ +@Internal +public interface FileLifeCycleListener { + + /** +* Notifies a new file has been opened. +* +* Note that this does not mean that the file has been created in the file system. It is +* only created logically and the actual file will be generated after it is committed. +* +* @param bucketID The bucketID of newly opened file. +* @param newPath The path of newly opened file. +*/ + void openPartFile(BucketID bucketID, Path newPath); Review comment: I think we might change the name of this method to `partFileOpened` or `onPartFileOpened`, the current name seems to open the part file actually in this method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API
wangyang0918 commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r513920680 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java ## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link LeaderElectionDriver} implementation for Zookeeper. The leading JobManager is elected using + * ZooKeeper. The current leader's address as well as its leader session ID is published via + * ZooKeeper. + */ +public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, LeaderLatchListener, NodeCacheListener, UnhandledErrorListener { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class); + + private final Object lock = new Object(); + + /** Client to the ZooKeeper quorum. */ + private final CuratorFramework client; + + /** Curator recipe for leader election. */ + private final LeaderLatch leaderLatch; + + /** Curator recipe to watch a given ZooKeeper node for changes. */ + private final NodeCache cache; + + /** ZooKeeper path of the node which stores the current leader information. */ + private final String leaderPath; + + private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); + + private final LeaderElectionEventHandler leaderElectionEventHandler; + + private final FatalErrorHandler fatalErrorHandler; + + private final String leaderContenderDescription; + + @GuardedBy("lock") + private volatile boolean running; + + /** +* Creates a ZooKeeperLeaderElectionDriver object. +* +* @param client Client which is connected to the ZooKeeper quorum +* @param latchPath ZooKeeper node path for the leader election latch +* @param leaderPath ZooKeeper node path for the node which stores the current leader information +* @param leaderElectionEventHandler Event handler for processing leader change events +* @param fatalErrorHandler Fatal error handler +* @param leaderContenderDescription Leader contender description +*/ + public ZooKeeperLeaderElectionDriver( + CuratorFramework client, + String latchPath, + String leaderPath, + LeaderElectionEventHandler leaderElectionEventHandler, + FatalErrorHandler fatalErrorHandler, +
[GitHub] [flink] flinkbot edited a comment on pull request #13744: [FLINK-19766][table-runtime] Introduce File streaming compaction operators
flinkbot edited a comment on pull request #13744: URL: https://github.com/apache/flink/pull/13744#issuecomment-714369975 ## CI report: * a01ecf0bade9f8e4a56052fb5b5d25c5034fe511 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8486) * 039f49ada096d2f85fc6908618f77c8483a446fb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8550) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table
flinkbot edited a comment on pull request #13605: URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684 ## CI report: * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN * 96468e31f7614ef314c655f97a63fcabe83505a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499) * 5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8549) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.
dianfu commented on a change in pull request #13803: URL: https://github.com/apache/flink/pull/13803#discussion_r513882412 ## File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py ## @@ -0,0 +1,89 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pyflink.common.serialization import SimpleStringSchema +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic +from pyflink.datastream.connectors import FlinkKafkaProducer +from pyflink.datastream.functions import ProcessFunction, Collector +from pyflink.table import StreamTableEnvironment + + +def test_ds_timer(): +env = StreamExecutionEnvironment.get_execution_environment() +env.set_parallelism(1) +env.set_stream_time_characteristic(TimeCharacteristic.EventTime) + +t_env = StreamTableEnvironment.create(stream_execution_environment=env) + t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", Review comment: this is not necessary any more ## File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py ## @@ -0,0 +1,89 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pyflink.common.serialization import SimpleStringSchema +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic +from pyflink.datastream.connectors import FlinkKafkaProducer +from pyflink.datastream.functions import ProcessFunction, Collector +from pyflink.table import StreamTableEnvironment + + +def test_ds_timer(): +env = StreamExecutionEnvironment.get_execution_environment() +env.set_parallelism(1) Review comment: Why set the parallelism to 1? ## File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py ## @@ -0,0 +1,89 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pyflink.common.serialization import SimpleStringSchema +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic +from
[GitHub] [flink] flinkbot commented on pull request #13831: [FLINK-19624][fix] Fix special case when the reuse of exchange causes the deadlock
flinkbot commented on pull request #13831: URL: https://github.com/apache/flink/pull/13831#issuecomment-718332223 ## CI report: * f7a377f9a04c5d630cb578ebc9040e936e6a60a4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13830: [FLINK-14482][rocksdb] Bump FRocksDB verion to base on RocksDB-6.11.6
flinkbot commented on pull request #13830: URL: https://github.com/apache/flink/pull/13830#issuecomment-718331944 ## CI report: * e4cd22945ab2b3e7cb44c302e9db12231f52fdbc UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.
flinkbot edited a comment on pull request #13803: URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782 ## CI report: * 2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498) * a214b00956b850ec3aca2455478353472dd4f44c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13822: [FLINK-19856][network] Emit EndOfChannelRecoveryEvent
flinkbot edited a comment on pull request #13822: URL: https://github.com/apache/flink/pull/13822#issuecomment-717894411 ## CI report: * 6e23ce525a7af28103e7abd8547b2ba6c9850567 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8545) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13744: [FLINK-19766][table-runtime] Introduce File streaming compaction operators
flinkbot edited a comment on pull request #13744: URL: https://github.com/apache/flink/pull/13744#issuecomment-714369975 ## CI report: * a01ecf0bade9f8e4a56052fb5b5d25c5034fe511 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8486) * 039f49ada096d2f85fc6908618f77c8483a446fb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table
flinkbot edited a comment on pull request #13605: URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684 ## CI report: * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN * 96468e31f7614ef314c655f97a63fcabe83505a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499) * 5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #13307: [FLINK-19078][table-runtime] Import rowtime join temporal operator
wuchong commented on a change in pull request #13307: URL: https://github.com/apache/flink/pull/13307#discussion_r513890085 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala ## @@ -250,18 +251,16 @@ class StreamExecTemporalJoinToCoProcessTranslator private( val minRetentionTime = tableConfig.getMinIdleStateRetentionTime val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime if (rightRowTimeAttributeInputReference.isDefined) { - if (isTemporalFunctionJoin) { -new LegacyTemporalRowTimeJoinOperator( - InternalTypeInfo.of(leftInputType), - InternalTypeInfo.of(rightInputType), - generatedJoinCondition, - leftTimeAttributeInputReference, - rightRowTimeAttributeInputReference.get, - minRetentionTime, - maxRetentionTime) - } else { -throw new TableException("Event-time temporal join operator is not implemented yet.") - } + new TemporalRowTimeJoinOperator( +InternalTypeInfo.of(leftInputType), +InternalTypeInfo.of(rightInputType), +InternalTypeInfo.of(returnType).createSerializer(exeConfig), Review comment: Use `InternalSerializers.create(returnType)` instead, then we don't need to pass in the `exeConfig`. ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java ## @@ -212,22 +221,32 @@ public void close() throws Exception { * @return a row time of the oldest unprocessed probe record or Long.MaxValue, if all records * have been processed. */ - private long emitResultAndCleanUpState(long timerTimestamp) throws Exception { + private long emitResultAndCleanUpState(long currentWatermark) throws Exception { List rightRowsSorted = getRightRowSorted(rightRowtimeComparator); long lastUnprocessedTime = Long.MAX_VALUE; Iterator> leftIterator = leftState.entries().iterator(); + // keep the the output records' order same with left input records order + Map orderedOutputs = new TreeMap<>(); Review comment: We can store left row data to avoid copy joined result. ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala ## @@ -19,50 +19,100 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.table.api.TableException -import org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow -import org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData -import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.planner.factories.TestValuesTableFactory +import org.apache.flink.table.planner.factories.TestValuesTableFactory.{getRawResults, registerData} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase +import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.types.Row import org.junit._ +import org.junit.Assert.assertEquals import org.junit.runner.RunWith import org.junit.runners.Parameterized -import java.lang.{Long => JLong} +import java.time.LocalDateTime +import java.time.format.DateTimeParseException + +import scala.collection.JavaConversions._ @RunWith(classOf[Parameterized]) class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(state) { // test data for Processing-Time temporal table join val procTimeOrderData = List( -changelogRow("+I", toJLong(1), "Euro", "no1", toJLong(12)), -changelogRow("+I", toJLong(2), "US Dollar", "no1", toJLong(14)), -changelogRow("+I", toJLong(3), "US Dollar", "no2", toJLong(18)), -changelogRow("+I", toJLong(4), "RMB", "no1", toJLong(40))) +changelogRow("+I", 1L, "Euro", "no1", 12L), +changelogRow("+I", 2L, "US Dollar", "no1", 14L), +changelogRow("+I", 3L, "US Dollar", "no2", 18L), +changelogRow("+I", 4L, "RMB", "no1", 40L)) val procTimeCurrencyData = List( -changelogRow("+I","Euro", "no1", toJLong(114)), -changelogRow("+I","US Dollar", "no1", toJLong(102)), -changelogRow("+I","Yen", "no1", toJLong(1)), -changelogRow("+I","RMB", "no1", toJLong(702)), -changelogRow("+I","Euro", "no1", toJLong(118)), -changelogRow("+I","US Dollar", "no2", toJLong(106))) +changelogRow("+I", "Euro", "no1", 114L), +changelogRow("+I", "US Dollar", "no1", 102L), +changelogRow("+I", "Yen", "no1", 1L), +changelogRow("+I", "RMB", "no1", 702L), +changelogRow("+I", "Euro", "no1", 118L), +changelogRow("+I", "US Dollar", "no2",
[GitHub] [flink] flinkbot commented on pull request #13832: [FLINK-19822][table-planner] Remove redundant shuffle for streaming
flinkbot commented on pull request #13832: URL: https://github.com/apache/flink/pull/13832#issuecomment-718329744 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit d32b02c2668093ff2745c4d124b11807f0b3f11c (Thu Oct 29 03:05:22 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API
wangyang0918 commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r513904491 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java ## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link LeaderElectionDriver} implementation for Zookeeper. The leading JobManager is elected using + * ZooKeeper. The current leader's address as well as its leader session ID is published via + * ZooKeeper. + */ +public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, LeaderLatchListener, NodeCacheListener, UnhandledErrorListener { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class); + + private final Object lock = new Object(); + + /** Client to the ZooKeeper quorum. */ + private final CuratorFramework client; + + /** Curator recipe for leader election. */ + private final LeaderLatch leaderLatch; + + /** Curator recipe to watch a given ZooKeeper node for changes. */ + private final NodeCache cache; + + /** ZooKeeper path of the node which stores the current leader information. */ + private final String leaderPath; + + private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); + + private final LeaderElectionEventHandler leaderElectionEventHandler; + + private final FatalErrorHandler fatalErrorHandler; + + private final String leaderContenderDescription; + + @GuardedBy("lock") + private volatile boolean running; + + /** +* Creates a ZooKeeperLeaderElectionDriver object. +* +* @param client Client which is connected to the ZooKeeper quorum +* @param latchPath ZooKeeper node path for the leader election latch +* @param leaderPath ZooKeeper node path for the node which stores the current leader information +* @param leaderElectionEventHandler Event handler for processing leader change events +* @param fatalErrorHandler Fatal error handler +* @param leaderContenderDescription Leader contender description +*/ + public ZooKeeperLeaderElectionDriver( + CuratorFramework client, + String latchPath, + String leaderPath, + LeaderElectionEventHandler leaderElectionEventHandler, + FatalErrorHandler fatalErrorHandler, +
[GitHub] [flink] wangyang0918 commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API
wangyang0918 commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r513904491 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java ## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link LeaderElectionDriver} implementation for Zookeeper. The leading JobManager is elected using + * ZooKeeper. The current leader's address as well as its leader session ID is published via + * ZooKeeper. + */ +public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, LeaderLatchListener, NodeCacheListener, UnhandledErrorListener { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class); + + private final Object lock = new Object(); + + /** Client to the ZooKeeper quorum. */ + private final CuratorFramework client; + + /** Curator recipe for leader election. */ + private final LeaderLatch leaderLatch; + + /** Curator recipe to watch a given ZooKeeper node for changes. */ + private final NodeCache cache; + + /** ZooKeeper path of the node which stores the current leader information. */ + private final String leaderPath; + + private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); + + private final LeaderElectionEventHandler leaderElectionEventHandler; + + private final FatalErrorHandler fatalErrorHandler; + + private final String leaderContenderDescription; + + @GuardedBy("lock") + private volatile boolean running; + + /** +* Creates a ZooKeeperLeaderElectionDriver object. +* +* @param client Client which is connected to the ZooKeeper quorum +* @param latchPath ZooKeeper node path for the leader election latch +* @param leaderPath ZooKeeper node path for the node which stores the current leader information +* @param leaderElectionEventHandler Event handler for processing leader change events +* @param fatalErrorHandler Fatal error handler +* @param leaderContenderDescription Leader contender description +*/ + public ZooKeeperLeaderElectionDriver( + CuratorFramework client, + String latchPath, + String leaderPath, + LeaderElectionEventHandler leaderElectionEventHandler, + FatalErrorHandler fatalErrorHandler, +
[jira] [Updated] (FLINK-19822) Remove redundant shuffle for streaming
[ https://issues.apache.org/jira/browse/FLINK-19822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19822: --- Labels: pull-request-available (was: ) > Remove redundant shuffle for streaming > -- > > Key: FLINK-19822 > URL: https://issues.apache.org/jira/browse/FLINK-19822 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > This is similar > [FLINK-12575|https://issues.apache.org/jira/browse/FLINK-12575], we could > implement {{satisfyTraits}} method for stream nodes to remove redundant > shuffle. This could add more possibilities that more operators can be merged > into multiple input operator. > Different batch, stream operators require the shuffle keys and the state keys > must be exactly the same, otherwise the state may be not correct. > We only support a few operators in this issue, such as Join and regular > Aggregate. Other operators will be supported in the future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] godfreyhe opened a new pull request #13832: [FLINK-19822][table-planner] Remove redundant shuffle for streaming
godfreyhe opened a new pull request #13832: URL: https://github.com/apache/flink/pull/13832 ## What is the purpose of the change *Shuffle is one of the most heavy operation, and many shuffle operations is redundant, because the input operators have provide the required shuffle. Blink batch planner has supported removing redundant shuffle through planner rules, Blink streaming planner could also leverage similar approach to remove redundant shuffle. Different batch, stream operators require the shuffle keys and the state keys must be exactly the same, otherwise the state may be not correct. In this pr, we only support Join and regular Aggregate, other operators will be supported in the future.* ## Brief change log - *Do some code cleanup for BatchExecCalcBase and BatchExecCorrelateBase* - *Introduce join condition equal transfer rules to simplify join condition* - *Remove redundant shuffle for StreamExecJoin And StreamExecGroupAggregate* ## Verifying this change This change added tests and can be verified as follows: - *The existing tests could cover the first two commits, e.g. batch.sql.RemoveShuffleTest, stream.sql.join.JoinReorderTest* - *Added new test that validates the result of third commit, e.g. stream.sql.RemoveShuffleTest, RemoveShuffleITCase* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on a change in pull request #13784: [FLINK-19698][connectors/common] API improvements to the Sources.
becketqin commented on a change in pull request #13784: URL: https://github.com/apache/flink/pull/13784#discussion_r513902647 ## File path: flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java ## @@ -565,6 +565,20 @@ else if (t instanceof Error) { return Optional.empty(); } + /** +* Find the root cause of the given throwable chain. +* +* @param throwable the throwable chain to check. +* @return the root cause of the throwable chain. +*/ + public static Throwable findRootCause(Throwable throwable) { Review comment: Good point. I'll just reuse the Apache Commons Lang. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19822) Remove redundant shuffle for streaming
[ https://issues.apache.org/jira/browse/FLINK-19822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-19822: --- Description: This is similar [FLINK-12575|https://issues.apache.org/jira/browse/FLINK-12575], we could implement {{satisfyTraits}} method for stream nodes to remove redundant shuffle. This could add more possibilities that more operators can be merged into multiple input operator. Different batch, stream operators require the shuffle keys and the state keys must be exactly the same, otherwise the state may be not correct. We only support a few operators in this issue, such as Join and regular Aggregate. Other operators will be supported in the future. was: This is similar [FLINK-12575|https://issues.apache.org/jira/browse/FLINK-12575], we could implement {{satisfyTraits}} method for stream nodes to remove redundant shuffle. This could add more possibilities that more operators can be merged into multiple input operator. Note: to ensure the key range is correct, the shuffle can be removed only if the required keys must totally equal to the provided keys > Remove redundant shuffle for streaming > -- > > Key: FLINK-19822 > URL: https://issues.apache.org/jira/browse/FLINK-19822 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Fix For: 1.12.0 > > > This is similar > [FLINK-12575|https://issues.apache.org/jira/browse/FLINK-12575], we could > implement {{satisfyTraits}} method for stream nodes to remove redundant > shuffle. This could add more possibilities that more operators can be merged > into multiple input operator. > Different batch, stream operators require the shuffle keys and the state keys > must be exactly the same, otherwise the state may be not correct. > We only support a few operators in this issue, such as Join and regular > Aggregate. Other operators will be supported in the future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #13831: [FLINK-19624][fix] Fix special case when the reuse of exchange causes the deadlock
flinkbot commented on pull request #13831: URL: https://github.com/apache/flink/pull/13831#issuecomment-718326098 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit f7a377f9a04c5d630cb578ebc9040e936e6a60a4 (Thu Oct 29 02:52:32 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TsReaper opened a new pull request #13831: [FLINK-19624][fix] Fix special case when the reuse of exchange causes the deadlock
TsReaper opened a new pull request #13831: URL: https://github.com/apache/flink/pull/13831 ## What is the purpose of the change This PR fixes the special case when the reuse of exchange causes the deadlock. Currently the reuse of exchange is not considered to be a deadlock because although the exec node of an exchange is reused, its underlying transformation is not reused. However if this behavior changes a deadlock may occur. ## Brief change log - Fix special case when the reuse of exchange causes the deadlock ## Verifying this change This change is already covered by existing tests, also this change added tests and can be verified by running the added cases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13830: [FLINK-14482][rocksdb] Bump FRocksDB verion to base on RocksDB-6.11.6
flinkbot commented on pull request #13830: URL: https://github.com/apache/flink/pull/13830#issuecomment-718324551 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit e4cd22945ab2b3e7cb44c302e9db12231f52fdbc (Thu Oct 29 02:47:03 UTC 2020) **Warnings:** * **1 pom.xml files were touched**: Check for build and licensing issues. * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-14482) Bump up rocksdb version
[ https://issues.apache.org/jira/browse/FLINK-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14482: --- Labels: pull-request-available (was: ) > Bump up rocksdb version > --- > > Key: FLINK-14482 > URL: https://issues.apache.org/jira/browse/FLINK-14482 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > Current rocksDB-5.17.2 does not support write buffer manager well, we need to > bump rocksdb version to support that feature. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Myasuka opened a new pull request #13830: [FLINK-14482][rocksdb] Bump FRocksDB verion to base on RocksDB-6.11.6
Myasuka opened a new pull request #13830: URL: https://github.com/apache/flink/pull/13830 ## What is the purpose of the change Bump FRocksDB version to base on RocksDB-6.11.6 ## Brief change log - Bump FRocksDB version to base on RocksDB-6.11.6 ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on pull request #13789: URL: https://github.com/apache/flink/pull/13789#issuecomment-718319057 Thanks for the update, I will take a look these two days. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi merged pull request #13620: [hotfix][doc] the first FSDataOutputStream should be FSDataInputStream in FileSystem#L178
JingsongLi merged pull request #13620: URL: https://github.com/apache/flink/pull/13620 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19861) Improve the document of now() function
[ https://issues.apache.org/jira/browse/FLINK-19861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-19861: Component/s: Table SQL / API > Improve the document of now() function > -- > > Key: FLINK-19861 > URL: https://issues.apache.org/jira/browse/FLINK-19861 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table SQL / API >Affects Versions: 1.11.0 >Reporter: hailong wang >Priority: Minor > Fix For: 1.12.0 > > > `NOW()` function is deterministic in SQL standard, but not deterministic in > Flink SQL. > We should Improve the description of `NOW()` in > /dev/table/functions/systemFunctions.md. > See the email thread, > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-understand-NOW-in-SQL-when-using-Table-amp-SQL-API-to-develop-a-streaming-app-tc38881.html#none -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19861) Improve the document of now() function
[ https://issues.apache.org/jira/browse/FLINK-19861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-19861: Description: `NOW()` function is deterministic in SQL standard, but not deterministic in Flink SQL. We should Improve the description of `NOW()` in /dev/table/functions/systemFunctions.md. See the email thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-understand-NOW-in-SQL-when-using-Table-amp-SQL-API-to-develop-a-streaming-app-tc38881.html#none was: `NOW()` function is deterministic in batch mode, but not deterministic in streaming. We should Improve the description of `NOW()` in /dev/table/functions/systemFunctions.md. See the email thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-understand-NOW-in-SQL-when-using-Table-amp-SQL-API-to-develop-a-streaming-app-tc38881.html#none > Improve the document of now() function > -- > > Key: FLINK-19861 > URL: https://issues.apache.org/jira/browse/FLINK-19861 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.11.0 >Reporter: hailong wang >Priority: Minor > Fix For: 1.12.0 > > > `NOW()` function is deterministic in SQL standard, but not deterministic in > Flink SQL. > We should Improve the description of `NOW()` in > /dev/table/functions/systemFunctions.md. > See the email thread, > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-understand-NOW-in-SQL-when-using-Table-amp-SQL-API-to-develop-a-streaming-app-tc38881.html#none -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13307: [FLINK-19078][table-runtime] Import rowtime join temporal operator
flinkbot edited a comment on pull request #13307: URL: https://github.com/apache/flink/pull/13307#issuecomment-685763264 ## CI report: * f8a62099663b658669488af7f0ecd5087d1e5da6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8525) * 9f912a17d777463b856a19b64a03c16adadb8a73 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8546) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] vthinkxie commented on a change in pull request #13458: FLINK-18851 [runtime] Add checkpoint type to checkpoint history entries in Web UI
vthinkxie commented on a change in pull request #13458: URL: https://github.com/apache/flink/pull/13458#discussion_r513882504 ## File path: flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts ## @@ -55,21 +59,38 @@ export class JobCheckpointsDetailComponent implements OnInit { } refresh() { -this.isLoading = true; -if (this.jobDetail && this.jobDetail.jid) { - this.jobService.loadCheckpointDetails(this.jobDetail.jid, this.checkPoint.id).subscribe( -data => { - this.checkPointDetail = data; - this.isLoading = false; - this.cdr.markForCheck(); -}, -() => { - this.isLoading = false; - this.cdr.markForCheck(); -} - ); + this.isLoading = true; + if (this.jobDetail && this.jobDetail.jid) { +forkJoin([ + this.jobService.loadCheckpointConfig(this.jobDetail.jid), + this.jobService.loadCheckpointDetails(this.jobDetail.jid, this.checkPoint.id) +]).subscribe( + ([config, detail]) => { +this.checkPointConfig = config; +this.checkPointDetail = detail; +if (this.checkPointDetail.checkpoint_type === 'CHECKPOINT') { + if (this.checkPointConfig.unaligned_checkpoints) { +this.checkPointType = 'unaligned checkpoint'; + } else { +this.checkPointType = 'aligned checkpoint'; + } +} else if (this.checkPointDetail.checkpoint_type === 'SYNC_SAVEPOINT') { + this.checkPointType = 'savepoint on cancel'; +} else if (this.checkPointDetail.checkpoint_type === 'SAVEPOINT') { + this.checkPointType = 'savepoint'; +} else { + this.checkPointType = '-'; +} +this.isLoading = false; +this.cdr.markForCheck(); + }, + () => { +this.isLoading = false; +this.cdr.markForCheck(); + } +); + } } Review comment: Hi, I have a submit a commit here https://github.com/vthinkxie/flink/commit/0a513eb9ac724b2a15cf4c95acdf22e5a9456d10 could you have a look and apply this patch? There are still some format errors This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] vthinkxie commented on a change in pull request #13458: FLINK-18851 [runtime] Add checkpoint type to checkpoint history entries in Web UI
vthinkxie commented on a change in pull request #13458: URL: https://github.com/apache/flink/pull/13458#discussion_r513882504 ## File path: flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts ## @@ -55,21 +59,38 @@ export class JobCheckpointsDetailComponent implements OnInit { } refresh() { -this.isLoading = true; -if (this.jobDetail && this.jobDetail.jid) { - this.jobService.loadCheckpointDetails(this.jobDetail.jid, this.checkPoint.id).subscribe( -data => { - this.checkPointDetail = data; - this.isLoading = false; - this.cdr.markForCheck(); -}, -() => { - this.isLoading = false; - this.cdr.markForCheck(); -} - ); + this.isLoading = true; + if (this.jobDetail && this.jobDetail.jid) { +forkJoin([ + this.jobService.loadCheckpointConfig(this.jobDetail.jid), + this.jobService.loadCheckpointDetails(this.jobDetail.jid, this.checkPoint.id) +]).subscribe( + ([config, detail]) => { +this.checkPointConfig = config; +this.checkPointDetail = detail; +if (this.checkPointDetail.checkpoint_type === 'CHECKPOINT') { + if (this.checkPointConfig.unaligned_checkpoints) { +this.checkPointType = 'unaligned checkpoint'; + } else { +this.checkPointType = 'aligned checkpoint'; + } +} else if (this.checkPointDetail.checkpoint_type === 'SYNC_SAVEPOINT') { + this.checkPointType = 'savepoint on cancel'; +} else if (this.checkPointDetail.checkpoint_type === 'SAVEPOINT') { + this.checkPointType = 'savepoint'; +} else { + this.checkPointType = '-'; +} +this.isLoading = false; +this.cdr.markForCheck(); + }, + () => { +this.isLoading = false; +this.cdr.markForCheck(); + } +); + } } Review comment: Hi, I have a submit a commit here https://github.com/vthinkxie/flink/commit/0a513eb9ac724b2a15cf4c95acdf22e5a9456d10 could you have a look and apply this patch, there is still some format error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19770) PythonProgramOptionsTest requires package phase before execution
[ https://issues.apache.org/jira/browse/FLINK-19770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-19770: Affects Version/s: (was: 1.11.2) 1.12.0 > PythonProgramOptionsTest requires package phase before execution > > > Key: FLINK-19770 > URL: https://issues.apache.org/jira/browse/FLINK-19770 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.12.0 >Reporter: Juha Mynttinen >Assignee: Shuiqiang Chen >Priority: Minor > Labels: pull-request-available, starter > Fix For: 1.12.0 > > > The PR [https://github.com/apache/flink/pull/13322] lately added the test > method testConfigurePythonExecution in > org.apache.flink.client.cli.PythonProgramOptionsTest. > > "mvn clean verify" fails for me in testConfigurePythonExecution: > > ... > INFO] Running org.apache.flink.client.cli.PythonProgramOptionsTest > [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.433 > s <<< FAILURE! - in org.apache.flink.client.cli.PythonProgramOptionsTest > [ERROR] > testConfigurePythonExecution(org.apache.flink.client.cli.PythonProgramOptionsTest) > Time elapsed: 0.019 s <<< ERROR! > java.nio.file.NoSuchFileException: target/dummy-job-jar > at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) > at > java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55) > at > java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149) > at > java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99) > at java.base/java.nio.file.Files.readAttributes(Files.java:1763) > at > java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219) > at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276) > at java.base/java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:322) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2716) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2796) > at > org.apache.flink.client.cli.PythonProgramOptionsTest.testConfigurePythonExecution(PythonProgramOptionsTest.java:131) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83) >
[jira] [Closed] (FLINK-19770) PythonProgramOptionsTest requires package phase before execution
[ https://issues.apache.org/jira/browse/FLINK-19770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-19770. --- Fix Version/s: (was: 1.11.3) Resolution: Fixed Merged to master via 789d8b65fde1c86bca2fa1ec43b55686d1584c97 > PythonProgramOptionsTest requires package phase before execution > > > Key: FLINK-19770 > URL: https://issues.apache.org/jira/browse/FLINK-19770 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.11.2 >Reporter: Juha Mynttinen >Assignee: Shuiqiang Chen >Priority: Minor > Labels: pull-request-available, starter > Fix For: 1.12.0 > > > The PR [https://github.com/apache/flink/pull/13322] lately added the test > method testConfigurePythonExecution in > org.apache.flink.client.cli.PythonProgramOptionsTest. > > "mvn clean verify" fails for me in testConfigurePythonExecution: > > ... > INFO] Running org.apache.flink.client.cli.PythonProgramOptionsTest > [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.433 > s <<< FAILURE! - in org.apache.flink.client.cli.PythonProgramOptionsTest > [ERROR] > testConfigurePythonExecution(org.apache.flink.client.cli.PythonProgramOptionsTest) > Time elapsed: 0.019 s <<< ERROR! > java.nio.file.NoSuchFileException: target/dummy-job-jar > at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) > at > java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55) > at > java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149) > at > java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99) > at java.base/java.nio.file.Files.readAttributes(Files.java:1763) > at > java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219) > at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276) > at java.base/java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:322) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2716) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2796) > at > org.apache.flink.client.cli.PythonProgramOptionsTest.testConfigurePythonExecution(PythonProgramOptionsTest.java:131) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107) > at >
[jira] [Assigned] (FLINK-19770) PythonProgramOptionsTest requires package phase before execution
[ https://issues.apache.org/jira/browse/FLINK-19770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-19770: --- Assignee: Shuiqiang Chen > PythonProgramOptionsTest requires package phase before execution > > > Key: FLINK-19770 > URL: https://issues.apache.org/jira/browse/FLINK-19770 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.11.2 >Reporter: Juha Mynttinen >Assignee: Shuiqiang Chen >Priority: Minor > Labels: pull-request-available, starter > Fix For: 1.12.0, 1.11.3 > > > The PR [https://github.com/apache/flink/pull/13322] lately added the test > method testConfigurePythonExecution in > org.apache.flink.client.cli.PythonProgramOptionsTest. > > "mvn clean verify" fails for me in testConfigurePythonExecution: > > ... > INFO] Running org.apache.flink.client.cli.PythonProgramOptionsTest > [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.433 > s <<< FAILURE! - in org.apache.flink.client.cli.PythonProgramOptionsTest > [ERROR] > testConfigurePythonExecution(org.apache.flink.client.cli.PythonProgramOptionsTest) > Time elapsed: 0.019 s <<< ERROR! > java.nio.file.NoSuchFileException: target/dummy-job-jar > at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) > at > java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55) > at > java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149) > at > java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99) > at java.base/java.nio.file.Files.readAttributes(Files.java:1763) > at > java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219) > at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276) > at java.base/java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:322) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2716) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2796) > at > org.apache.flink.client.cli.PythonProgramOptionsTest.testConfigurePythonExecution(PythonProgramOptionsTest.java:131) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83) > at >
[GitHub] [flink] dianfu closed pull request #13756: [FLINK-19770][python][test] Changed the PythonProgramOptionTest to be an ITCase.
dianfu closed pull request #13756: URL: https://github.com/apache/flink/pull/13756 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13822: [FLINK-19856][network] Emit EndOfChannelRecoveryEvent
flinkbot edited a comment on pull request #13822: URL: https://github.com/apache/flink/pull/13822#issuecomment-717894411 ## CI report: * c121f8a279bb69448c326dab7cee19fd5bb2096c Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8544) * 6e23ce525a7af28103e7abd8547b2ba6c9850567 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8545) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13307: [FLINK-19078][table-runtime] Import rowtime join temporal operator
flinkbot edited a comment on pull request #13307: URL: https://github.com/apache/flink/pull/13307#issuecomment-685763264 ## CI report: * f8a62099663b658669488af7f0ecd5087d1e5da6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8525) * 9f912a17d777463b856a19b64a03c16adadb8a73 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19865) YARN tests failed with "java.lang.NumberFormatException: For input string: "${env:MAX_LOG_FILE_NUMBER}" java.lang.NumberFormatException: For input string: "${env:MAX_LOG
[ https://issues.apache.org/jira/browse/FLINK-19865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-19865: Labels: test-stability (was: ) > YARN tests failed with "java.lang.NumberFormatException: For input string: > "${env:MAX_LOG_FILE_NUMBER}" java.lang.NumberFormatException: For input > string: "${env:MAX_LOG_FILE_NUMBER}"" > > > Key: FLINK-19865 > URL: https://issues.apache.org/jira/browse/FLINK-19865 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=245e1f2e-ba5b-5570-d689-25ae21e5302f=e7f339b2-a7c3-57d9-00af-3712d4b15354 > {code} > 2020-10-28T22:58:39.4927767Z 2020-10-28 22:57:33,866 main ERROR Could not > create plugin of type class > org.apache.logging.log4j.core.appender.rolling.DefaultRolloverStrategy for > element DefaultRolloverStrategy: java.lang.NumberFormatException: For input > string: "${env:MAX_LOG_FILE_NUMBER}" java.lang.NumberFormatException: For > input string: "${env:MAX_LOG_FILE_NUMBER}" > 2020-10-28T22:58:39.4929252Z at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > 2020-10-28T22:58:39.4929823Z at java.lang.Integer.parseInt(Integer.java:569) > 2020-10-28T22:58:39.4930327Z at java.lang.Integer.parseInt(Integer.java:615) > 2020-10-28T22:58:39.4931047Z at > org.apache.logging.log4j.core.appender.rolling.DefaultRolloverStrategy$Builder.build(DefaultRolloverStrategy.java:137) > 2020-10-28T22:58:39.4931866Z at > org.apache.logging.log4j.core.appender.rolling.DefaultRolloverStrategy$Builder.build(DefaultRolloverStrategy.java:90) > 2020-10-28T22:58:39.4932720Z at > org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:122) > 2020-10-28T22:58:39.4933446Z at > org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:1002) > 2020-10-28T22:58:39.4934275Z at > org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:942) > 2020-10-28T22:58:39.4935029Z at > org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:934) > 2020-10-28T22:58:39.4935837Z at > org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:934) > 2020-10-28T22:58:39.4936605Z at > org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:552) > 2020-10-28T22:58:39.4937573Z at > org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:241) > 2020-10-28T22:58:39.4938429Z at > org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:288) > 2020-10-28T22:58:39.4939206Z at > org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:579) > 2020-10-28T22:58:39.4939885Z at > org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) > 2020-10-28T22:58:39.4940490Z at > org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) > 2020-10-28T22:58:39.4941087Z at > org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) > 2020-10-28T22:58:39.4941733Z at > org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) > 2020-10-28T22:58:39.4942534Z at > org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) > 2020-10-28T22:58:39.4943154Z at > org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) > 2020-10-28T22:58:39.4943820Z at > org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) > 2020-10-28T22:58:39.4944540Z at > org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) > 2020-10-28T22:58:39.4945199Z at > org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) > 2020-10-28T22:58:39.4945858Z at > org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) > 2020-10-28T22:58:39.4946426Z at > org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329) > 2020-10-28T22:58:39.4946965Z at > org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) > 2020-10-28T22:58:39.4947698Z at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:108) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19865) YARN tests failed with "java.lang.NumberFormatException: For input string: "${env:MAX_LOG_FILE_NUMBER}" java.lang.NumberFormatException: For input string: "${env:MAX_LOG
Dian Fu created FLINK-19865: --- Summary: YARN tests failed with "java.lang.NumberFormatException: For input string: "${env:MAX_LOG_FILE_NUMBER}" java.lang.NumberFormatException: For input string: "${env:MAX_LOG_FILE_NUMBER}"" Key: FLINK-19865 URL: https://issues.apache.org/jira/browse/FLINK-19865 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.12.0 Reporter: Dian Fu https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=245e1f2e-ba5b-5570-d689-25ae21e5302f=e7f339b2-a7c3-57d9-00af-3712d4b15354 {code} 2020-10-28T22:58:39.4927767Z 2020-10-28 22:57:33,866 main ERROR Could not create plugin of type class org.apache.logging.log4j.core.appender.rolling.DefaultRolloverStrategy for element DefaultRolloverStrategy: java.lang.NumberFormatException: For input string: "${env:MAX_LOG_FILE_NUMBER}" java.lang.NumberFormatException: For input string: "${env:MAX_LOG_FILE_NUMBER}" 2020-10-28T22:58:39.4929252Zat java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) 2020-10-28T22:58:39.4929823Zat java.lang.Integer.parseInt(Integer.java:569) 2020-10-28T22:58:39.4930327Zat java.lang.Integer.parseInt(Integer.java:615) 2020-10-28T22:58:39.4931047Zat org.apache.logging.log4j.core.appender.rolling.DefaultRolloverStrategy$Builder.build(DefaultRolloverStrategy.java:137) 2020-10-28T22:58:39.4931866Zat org.apache.logging.log4j.core.appender.rolling.DefaultRolloverStrategy$Builder.build(DefaultRolloverStrategy.java:90) 2020-10-28T22:58:39.4932720Zat org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:122) 2020-10-28T22:58:39.4933446Zat org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:1002) 2020-10-28T22:58:39.4934275Zat org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:942) 2020-10-28T22:58:39.4935029Zat org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:934) 2020-10-28T22:58:39.4935837Zat org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:934) 2020-10-28T22:58:39.4936605Zat org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:552) 2020-10-28T22:58:39.4937573Zat org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:241) 2020-10-28T22:58:39.4938429Zat org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:288) 2020-10-28T22:58:39.4939206Zat org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:579) 2020-10-28T22:58:39.4939885Zat org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) 2020-10-28T22:58:39.4940490Zat org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) 2020-10-28T22:58:39.4941087Zat org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) 2020-10-28T22:58:39.4941733Zat org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) 2020-10-28T22:58:39.4942534Zat org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) 2020-10-28T22:58:39.4943154Zat org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) 2020-10-28T22:58:39.4943820Zat org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) 2020-10-28T22:58:39.4944540Zat org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) 2020-10-28T22:58:39.4945199Zat org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) 2020-10-28T22:58:39.4945858Zat org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) 2020-10-28T22:58:39.4946426Zat org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329) 2020-10-28T22:58:39.4946965Zat org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) 2020-10-28T22:58:39.4947698Zat org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:108) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19843) ParquetFsStreamingSinkITCase.testPart failed with "Trying to access closed classloader"
[ https://issues.apache.org/jira/browse/FLINK-19843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222652#comment-17222652 ] Dian Fu commented on FLINK-19843: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=ba53eb01-1462-56a3-8e98-0dd97fbcaab5=bfbc6239-57a0-5db0-63f3-41551b4f7d51 > ParquetFsStreamingSinkITCase.testPart failed with "Trying to access closed > classloader" > --- > > Key: FLINK-19843 > URL: https://issues.apache.org/jira/browse/FLINK-19843 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8431=logs=ba53eb01-1462-56a3-8e98-0dd97fbcaab5=bfbc6239-57a0-5db0-63f3-41551b4f7d51 > {code} > 2020-10-27T22:51:46.7422561Z [ERROR] > testPart(org.apache.flink.formats.parquet.ParquetFsStreamingSinkITCase) Time > elapsed: 7.031 s <<< ERROR! 2020-10-27T22:51:46.7423062Z > java.lang.RuntimeException: Failed to fetch next result > 2020-10-27T22:51:46.7425294Z at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) > 2020-10-27T22:51:46.7426708Z at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77) > 2020-10-27T22:51:46.7427791Z at > org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115) > 2020-10-27T22:51:46.7428869Z at > org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355) > 2020-10-27T22:51:46.7429957Z at > java.util.Iterator.forEachRemaining(Iterator.java:115) > 2020-10-27T22:51:46.7430652Z at > org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:114) > 2020-10-27T22:51:46.7431826Z at > org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase.check(FsStreamingSinkITCaseBase.scala:141) > 2020-10-27T22:51:46.7432859Z at > org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase.test(FsStreamingSinkITCaseBase.scala:122) > 2020-10-27T22:51:46.7433902Z at > org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase.testPart(FsStreamingSinkITCaseBase.scala:86) > 2020-10-27T22:51:46.7434702Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-10-27T22:51:46.7435452Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-10-27T22:51:46.7436661Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-10-27T22:51:46.7437367Z at > java.lang.reflect.Method.invoke(Method.java:498) 2020-10-27T22:51:46.7438119Z > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2020-10-27T22:51:46.7438966Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2020-10-27T22:51:46.7439789Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2020-10-27T22:51:46.7440666Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2020-10-27T22:51:46.7441740Z at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2020-10-27T22:51:46.7442533Z at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2020-10-27T22:51:46.7443290Z at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > 2020-10-27T22:51:46.7444227Z at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > 2020-10-27T22:51:46.7445043Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-10-27T22:51:46.7445631Z at java.lang.Thread.run(Thread.java:748) > 2020-10-27T22:51:46.7446383Z Caused by: java.io.IOException: Failed to fetch > job execution result 2020-10-27T22:51:46.7447239Z at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175) > 2020-10-27T22:51:46.7448233Z at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126) > 2020-10-27T22:51:46.7449239Z at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103) > 2020-10-27T22:51:46.7449963Z ... 22 more 2020-10-27T22:51:46.7450619Z Caused > by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >
[jira] [Created] (FLINK-19864) TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but was:<-9223372036854775808>"
Dian Fu created FLINK-19864: --- Summary: TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but was:<-9223372036854775808>" Key: FLINK-19864 URL: https://issues.apache.org/jira/browse/FLINK-19864 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.12.0 Reporter: Dian Fu https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=7c61167f-30b3-5893-cc38-a9e3d057e392 {code} 2020-10-28T22:40:44.2528420Z [ERROR] testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest) Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z java.lang.AssertionError: expected:<1> but was:<-9223372036854775808> 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88) 2020-10-28T22:40:44.2542157Z at org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z at org.junit.Assert.assertEquals(Assert.java:645) 2020-10-28T22:40:44.2543456Z at org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19864) TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but was:<-9223372036854775808>"
[ https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-19864: Labels: test-stability (was: ) > TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but > was:<-9223372036854775808>" > - > > Key: FLINK-19864 > URL: https://issues.apache.org/jira/browse/FLINK-19864 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=7c61167f-30b3-5893-cc38-a9e3d057e392 > {code} > 2020-10-28T22:40:44.2528420Z [ERROR] > testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest) > Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z > java.lang.AssertionError: expected:<1> but was:<-9223372036854775808> > 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88) > 2020-10-28T22:40:44.2542157Z at > org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z > at org.junit.Assert.assertEquals(Assert.java:645) > 2020-10-28T22:40:44.2543456Z at > org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19863) SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process failed due to timeout"
Dian Fu created FLINK-19863: --- Summary: SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process failed due to timeout" Key: FLINK-19863 URL: https://issues.apache.org/jira/browse/FLINK-19863 Project: Flink Issue Type: Bug Components: Connectors / HBase Affects Versions: 1.12.0 Reporter: Dian Fu https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6 {code} 00:50:02,589 [main] INFO org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping Flink cluster. 00:50:04,106 [main] INFO org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping Flink cluster. 00:50:04,741 [main] INFO org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource [] - Backed up logs to /home/vsts/work/1/s/flink-end-to-end-tests/artifacts/flink-b3924665-1ac9-4309-8255-20f0dc94e7b9. 00:50:04,788 [main] INFO org.apache.flink.tests.util.hbase.LocalStandaloneHBaseResource [] - Stopping HBase Cluster 00:50:16,243 [main] ERROR org.apache.flink.tests.util.hbase.SQLClientHBaseITCase [] - Test testHBase[0: hbase-version:1.4.3](org.apache.flink.tests.util.hbase.SQLClientHBaseITCase) failed with: java.io.IOException: Process failed due to timeout. at org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:130) at org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:108) at org.apache.flink.tests.util.flink.FlinkDistribution.submitSQLJob(FlinkDistribution.java:221) at org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource$StandaloneClusterController.submitSQLJob(LocalStandaloneFlinkResource.java:196) at org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.executeSqlStatements(SQLClientHBaseITCase.java:215) at org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.testHBase(SQLClientHBaseITCase.java:152) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19863) SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process failed due to timeout"
[ https://issues.apache.org/jira/browse/FLINK-19863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-19863: Labels: test-stability (was: ) > SQLClientHBaseITCase.testHBase failed with "java.io.IOException: Process > failed due to timeout" > --- > > Key: FLINK-19863 > URL: https://issues.apache.org/jira/browse/FLINK-19863 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=3425d8ba-5f03-540a-c64b-51b8481bf7d6 > {code} > 00:50:02,589 [main] INFO > org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping > Flink cluster. > 00:50:04,106 [main] INFO > org.apache.flink.tests.util.flink.FlinkDistribution [] - Stopping > Flink cluster. > 00:50:04,741 [main] INFO > org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource [] - Backed up > logs to > /home/vsts/work/1/s/flink-end-to-end-tests/artifacts/flink-b3924665-1ac9-4309-8255-20f0dc94e7b9. > 00:50:04,788 [main] INFO > org.apache.flink.tests.util.hbase.LocalStandaloneHBaseResource [] - Stopping > HBase Cluster > 00:50:16,243 [main] ERROR > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase [] - > > Test testHBase[0: > hbase-version:1.4.3](org.apache.flink.tests.util.hbase.SQLClientHBaseITCase) > failed with: > java.io.IOException: Process failed due to timeout. > at > org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:130) > at > org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:108) > at > org.apache.flink.tests.util.flink.FlinkDistribution.submitSQLJob(FlinkDistribution.java:221) > at > org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource$StandaloneClusterController.submitSQLJob(LocalStandaloneFlinkResource.java:196) > at > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.executeSqlStatements(SQLClientHBaseITCase.java:215) > at > org.apache.flink.tests.util.hbase.SQLClientHBaseITCase.testHBase(SQLClientHBaseITCase.java:152) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13822: [FLINK-19856][network] Emit EndOfChannelRecoveryEvent
flinkbot edited a comment on pull request #13822: URL: https://github.com/apache/flink/pull/13822#issuecomment-717894411 ## CI report: * cecfabd36b9192284b4671fbdf48eb933ffc79e5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8532) * c121f8a279bb69448c326dab7cee19fd5bb2096c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8544) * 6e23ce525a7af28103e7abd8547b2ba6c9850567 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13822: [FLINK-19856][network] Emit EndOfChannelRecoveryEvent
flinkbot edited a comment on pull request #13822: URL: https://github.com/apache/flink/pull/13822#issuecomment-717894411 ## CI report: * cecfabd36b9192284b4671fbdf48eb933ffc79e5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8532) * c121f8a279bb69448c326dab7cee19fd5bb2096c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8544) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #13796: [FLINK-19810][CI] Automatically run a basic NOTICE file check on CI
zentol commented on pull request #13796: URL: https://github.com/apache/flink/pull/13796#issuecomment-718289508 > One benefit would be, that our release process is less complicated Doesn't really apply imo because this can be easily automated. While the (build) process is more complicated, it is not more complicated for the RM. > Second benefit is that it is easier to check that the generated NOTICE files are valid when you have to regenerate and commit them. With your approach, we would wait with that until the release, and it would only be done by a few "experts". Realistically, only experts will look at the results _anyway_. Once something is generated people stop caring about correctness, and just go through the motions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SteNicholas commented on a change in pull request #13828: [FLINK-19835] Don't emit intermediate watermarks from sources in BATCH execution mode
SteNicholas commented on a change in pull request #13828: URL: https://github.com/apache/flink/pull/13828#discussion_r513842239 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java ## @@ -254,4 +269,10 @@ private void registerReader() { ListState getReaderState() { return readerState; } + + // Not really meant to be here, this is an example. + // TODO: Should we use RuntimeExecutionMode here? I don't like that that one has AUTOMATIC. + public enum RuntimeMode { + BATCH, STREAMING Review comment: @aljoscha @dawidwys @kl0u I have confusion about which choice should follow. As @aljoscha mentioned that he prefers to use more enums than boolean. And @dawidwys @kl0u prefers to use a boolean. I would like to follow your standard way later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SteNicholas commented on a change in pull request #13828: [FLINK-19835] Don't emit intermediate watermarks from sources in BATCH execution mode
SteNicholas commented on a change in pull request #13828: URL: https://github.com/apache/flink/pull/13828#discussion_r513842239 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java ## @@ -254,4 +269,10 @@ private void registerReader() { ListState getReaderState() { return readerState; } + + // Not really meant to be here, this is an example. + // TODO: Should we use RuntimeExecutionMode here? I don't like that that one has AUTOMATIC. + public enum RuntimeMode { + BATCH, STREAMING Review comment: @aljoscha @dawidwys @kl0u I have confusion about which choice should follow. As @aljoscha mentioned that he prefers to use more enums than boolean. And @dawidwys @kl0u prefers to use a boolean. I would like to follow your standard later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13829: [FLINK-19862][coordination] Check for null
flinkbot edited a comment on pull request #13829: URL: https://github.com/apache/flink/pull/13829#issuecomment-718159576 ## CI report: * 3e7b304163f959489fe85aaffa43f664b65b65e5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8540) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13770: [FLINK-18858][connector-kinesis] Add Kinesis sources and sinks
flinkbot edited a comment on pull request #13770: URL: https://github.com/apache/flink/pull/13770#issuecomment-715323245 ## CI report: * c1af295630bc8064312d310846613a8a3b23bbca UNKNOWN * c195c6d2c594e176fdbc038ee9672a90dd10ef80 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8539) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13797: [FLINK-19465][runtime / statebackends] Add CheckpointStorage interface and wire through runtime
flinkbot edited a comment on pull request #13797: URL: https://github.com/apache/flink/pull/13797#issuecomment-716854695 ## CI report: * c06b3dcd4eb4045f66352c8b65c14a7b60163ecd UNKNOWN * 185e46fd046c5f39a87ac3583ad559b065a275e1 UNKNOWN * f9196a71051202e88294076b7a514f895fb9f443 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8451) * 540d2dacd7426f974335e2690be6fa6fa7e7f818 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8531) * 942af8415dd1a9fbd715532bf6b578a7a4b7f082 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13825: [FLINK-19836] Add SimpleVersionedSerializerTypeSerializerProxy
flinkbot edited a comment on pull request #13825: URL: https://github.com/apache/flink/pull/13825#issuecomment-717897004 ## CI report: * 07988460fce9311f7ee7857fede86ba78f5629e5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8536) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org