[jira] [Updated] (FLINK-22456) Support InitializeOnMaster and FinalizeOnMaster to be used in InputFormat
[ https://issues.apache.org/jira/browse/FLINK-22456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-22456: --- Labels: pull-request-available (was: ) > Support InitializeOnMaster and FinalizeOnMaster to be used in InputFormat > - > > Key: FLINK-22456 > URL: https://issues.apache.org/jira/browse/FLINK-22456 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Li >Priority: Minor > Labels: pull-request-available > > In _InputOutputFormatVertex_, _initializeGlobal_ and _finalizeGlobal_ > are only called when the Format is _OutputFormat_, however _InputFormat_ is > not be called. > In FLINK-1722, its say _HadoopOutputFormats_ ues it to do something > before and after the task. And they only support _initializeGlobal_ and > _finalizeGlobal_ in _OutputFormat_. > I don't know why _InputFormat_ doesn't support, anyone can tell me > why? > But I think _InitializeOnMaster_ and _FinalizeOnMaster_ should also > be supported in _InputFormat_. > For example, an offline task in _JdbcInputFormat_, user can use > _initializeGlobal_ to query the total counts of this task, and then user can > create InputSplits by total counts. While task running, user can add progress > indicators metric by calculating the total number of records divided by the > current number of reads, and even the remaining time of the task can be > estimated. It is very helpful for users to view task progress and remaining > time through external systems. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] kanata163 opened a new pull request #15775: [FLINK-22456][runtime] Support InitializeOnMaster and FinalizeOnMaster to be used in InputFormat
kanata163 opened a new pull request #15775: URL: https://github.com/apache/flink/pull/15775 ## What is the purpose of the change *Support `InitializeOnMaster` and `FinalizeOnMaster` to be used in InputFormat* ## Brief change log - `InputOutputFormatVertex`calls `initializeGlobal()` when `inputFormat` instanceof `InitializeOnMaster` - `InputOutputFormatVertex`calls `finalizeGlobal()` when `inputFormat` instanceof `FinalizeOnMaster` ## Verifying this change This change added tests and can be verified as follows: - *Extended test that validates the logic of `InputOutputFormatVertex`.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15746: [FLINK-17783][Table SQL/Ecosystem]Add array,map,row types support for orc row writer
flinkbot edited a comment on pull request #15746: URL: https://github.com/apache/flink/pull/15746#issuecomment-826269766 ## CI report: * 08cf1b3d72dbf729d6de14c8d508b10b5d1a20b1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17154) * 8d6792043f6dcba404f1313d35d3c1f9a6ddd7a4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17265) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15131: [FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn
flinkbot edited a comment on pull request #15131: URL: https://github.com/apache/flink/pull/15131#issuecomment-794957907 ## CI report: * 0500d84f78d8fdb967b077fb024fda0cd4ba2d6a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15682) * b99e1a12c74051454d211b0094c986b8741dd28f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17261) * e375a91dd1bd01ff1142c14a0f982eb7d5f94c14 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332957#comment-17332957 ] Dong Lin edited comment on FLINK-22085 at 4/27/21, 5:53 AM: Thank you [~dwysakowicz] for the information. For the first test failure [1], it could be because the Azure pipeline is very slow and the it takes more than 60 seconds (due to long GC) to complete that test. Maybe we can see if increasing the timeout to 120 seconds could reduce the failure rate of this test. For the second test failure [2], it appears that the test failed due to "OperatorEvent from an OperatorCoordinator to a task was lost". This is relate to https://github.com/apache/flink/pull/15605 which was committed recently. Given that KafkaSourceLegacyITCase no longer hangs and the comment history in this JIRA is already very long, I opened https://issues.apache.org/jira/browse/FLINK-22488 to track the issue of "OperatorEvent from an OperatorCoordinator to a task was lost". Maybe we can close this JIRA and continue the discussion in FLINK-22488. [1] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17206=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6612 [2] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=7062 was (Author: lindong): Thank you [~dwysakowicz] for the information. For the first test failure [1], it could be because the Azure pipeline is very slow and the it takes more than 60 seconds (due to long GC) to complete that test. Maybe we can see if increasing the timeout to 120 seconds could reduce the failure rate of this test. For the second test failure [2], it appears that the test failed due to "OperatorEvent from an OperatorCoordinator to a task was lost". This is relate to https://github.com/apache/flink/pull/15605 which was committed recently. Since the KafkaSourceLegacyITCase no longer hangs and the comments in this JIRA is already very long, I opened https://issues.apache.org/jira/browse/FLINK-22488 to track the issue of "OperatorEvent from an OperatorCoordinator to a task was lost". Maybe we can close this JIRA and continue the discussion in FLINK-22488. [1] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17206=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6612 [2] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=7062 > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Dong Lin >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.13.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at >
[GitHub] [flink] curcur edited a comment on pull request #15674: [FLINK-22311] [connector/jdbc] Validate maxRetries for XA Sink
curcur edited a comment on pull request #15674: URL: https://github.com/apache/flink/pull/15674#issuecomment-827331397 doc checks seem to fail: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17083=logs=c5d67f7d-375d-5407-4743-f9d0c4436a81=38411795-40c9-51fa-10b0-bd083cf9f5a5 Would you please try `mvn package -Dgenerate-config-docs -pl flink-docs -am -nsu -DskipTests` to regenerate docs? Or I can simply put this into the doc follow-up: https://github.com/apache/flink/pull/15720/commits/e69548aec17fdffe8a60dee7a4a58d3966ee06e7 And you can finish the pre-check before 1.13 release? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332957#comment-17332957 ] Dong Lin commented on FLINK-22085: -- Thank you [~dwysakowicz] for the information. For the first test failure [1], it could be because the Azure pipeline is very slow and the it takes more than 60 seconds (due to long GC) to complete that test. Maybe we can see if increasing the timeout to 120 seconds could reduce the failure rate of this test. For the second test failure [2], it appears that the test failed due to "OperatorEvent from an OperatorCoordinator to a task was lost". This is relate to https://github.com/apache/flink/pull/15605 which was committed recently. Since the KafkaSourceLegacyITCase no longer hangs and the comments in this JIRA is already very long, I opened https://issues.apache.org/jira/browse/FLINK-22488 to track the issue of "OperatorEvent from an OperatorCoordinator to a task was lost". Maybe we can close this JIRA and continue the discussion in FLINK-22488. [1] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17206=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6612 [2] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=7062 > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Dong Lin >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.13.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] > {code} > *2) Root cause explanations* > The test would hang because it enters the following loop: > - closeOnFlush() is called for a given channel > - closeOnFlush() calls channel.write(..) > - channel.write() triggers the exceptionCaught(...) callback > - closeOnFlush() is called for the same channel again. > *3) Solution* > Update closeOnFlush() so that, if a channel is being closed by this method, > then closeOnFlush() would not try to write to this channel if it is called on >
[GitHub] [flink] wuchong commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…
wuchong commented on a change in pull request #15755: URL: https://github.com/apache/flink/pull/15755#discussion_r620886208 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java ## @@ -144,6 +149,116 @@ public static Operation convertChangeColumn( // TODO: handle watermark and constraints } +public static Operation convertRenameColumn( +ObjectIdentifier tableIdentifier, +String originColumnName, +String newColumnName, +CatalogTable catalogTable) { + +Schema modifiedTableSchema = catalogTable.getUnresolvedSchema(); +validateColumnName(originColumnName, newColumnName, modifiedTableSchema); + +Schema.Builder builder = Schema.newBuilder(); +// build column +modifiedTableSchema.getColumns().stream() +.forEach( +column -> { +if (StringUtils.equals(column.getName(), originColumnName)) { +buildNewColumnFromOriginColumn(builder, column, newColumnName); +} else { +buildNewColumnFromOriginColumn(builder, column, column.getName()); +} +}); +// build primary key column +List originPrimaryKeyNames = +modifiedTableSchema +.getPrimaryKey() +.map(Schema.UnresolvedPrimaryKey::getColumnNames) +.orElseGet(Lists::newArrayList); + +List newPrimaryKeyNames = +originPrimaryKeyNames.stream() +.map( +pkName -> +StringUtils.equals(pkName, originColumnName) +? newColumnName +: pkName) +.collect(Collectors.toList()); + +if (newPrimaryKeyNames.size() > 0) { +builder.primaryKey(newPrimaryKeyNames); +} +// build watermark +modifiedTableSchema.getWatermarkSpecs().stream() +.forEach( +watermarkSpec -> { +String watermarkRefColumnName = watermarkSpec.getColumnName(); +Expression watermarkExpression = watermarkSpec.getWatermarkExpression(); +if (StringUtils.equals(watermarkRefColumnName, originColumnName)) { +String newWatermarkExpression = +((SqlCallExpression) watermarkExpression) +.getSqlExpression() + .replace(watermarkRefColumnName, newColumnName); Review comment: Would be better to investigate solutions first, I'm not sure whether there are any other ways to do this. cc @twalthr , do you have any ideas? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #15674: [FLINK-22311] [connector/jdbc] Validate maxRetries for XA Sink
curcur commented on pull request #15674: URL: https://github.com/apache/flink/pull/15674#issuecomment-827331397 doc checks seem to fail: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22408) Flink Table Parsr Hive Drop Partitions Syntax unparse is Error
[ https://issues.apache.org/jira/browse/FLINK-22408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332945#comment-17332945 ] Rui Li commented on FLINK-22408: [~aidenma] OK. I think we can fix this issue in release-1.12 branch. We don't need to fix it in 1.13 and later, since the calcite parser is deprecated. So please submit a PR for 1.12. > Flink Table Parsr Hive Drop Partitions Syntax unparse is Error > -- > > Key: FLINK-22408 > URL: https://issues.apache.org/jira/browse/FLINK-22408 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.11.3 >Reporter: Ma Jun >Priority: Major > Labels: pull-request-available > > Flink Table Parser is error: > *Synopsis:* > > *SQL:* > {code:java} > alter table tbl drop partition (p1='a',p2=1), partition(p1='b',p2=2);{code} > *hive muit partition unparse toSqlString is :* > {code:java} > ALTER TABLE `TBL`\n" + > "DROP\n" + > "PARTITION (`P1` = 'a', `P2` = 1)\n" + > "PARTITION (`P1` = 'b', `P2` = 2) > {code} > Missing comma in Partition SqlNodeList > Hive syntax: > {code:java} > ALTER TABLE table_name DROP [IF EXISTS] PARTITION (partition_spec) [, > PARTITION (partition_spec)]; > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22488) KafkaSourceLegacyITCase.testOneToOneSources failed due to "OperatorEvent from an OperatorCoordinator to a task was lost"
Dong Lin created FLINK-22488: Summary: KafkaSourceLegacyITCase.testOneToOneSources failed due to "OperatorEvent from an OperatorCoordinator to a task was lost" Key: FLINK-22488 URL: https://issues.apache.org/jira/browse/FLINK-22488 Project: Flink Issue Type: Improvement Reporter: Dong Lin According to [1], the test KafkaSourceLegacyITCase.testOneToOneSources failed because it runs a streaming job (which uses KafkaSource) with restartAttempts=1. In addition to the failover explicitly triggered by the FailingIdentityMapper, the job additionally failed due to "org.apache.flink.util.FlinkException: An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency", which is unexpected by the test. Note that SubtaskGatewayImpl was updated by [2] on 4/14 which triggers task failover if any OperatorEvent was lost. This could explain why those Kafka tests start to fail due to the exception described above. In order to make this test stable, let's try to understand why there is such a high chance of loosing OperatorEvent in the Azure test pipeline. And if we could not avoid loosing OperatorEvent in the test pipeline, we probably need to update the test to allow the pipeline being restarted arbitrary times (and still be able to stop the test on the happy path). [1] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6960 [2] https://github.com/apache/flink/pull/15605 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15763: [FLINK-22364][doc] Translate the page of "Data Sources" to Chinese.
flinkbot edited a comment on pull request #15763: URL: https://github.com/apache/flink/pull/15763#issuecomment-826589720 ## CI report: * 0dcb773365ec16118d2b4c0f55910441bc6d7d58 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17259) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15746: [FLINK-17783][Table SQL/Ecosystem]Add array,map,row types support for orc row writer
flinkbot edited a comment on pull request #15746: URL: https://github.com/apache/flink/pull/15746#issuecomment-826269766 ## CI report: * 08cf1b3d72dbf729d6de14c8d508b10b5d1a20b1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17154) * 8d6792043f6dcba404f1313d35d3c1f9a6ddd7a4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rmetzger commented on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
rmetzger commented on pull request #15599: URL: https://github.com/apache/flink/pull/15599#issuecomment-827326084 Yeah, I'm happy to help, I also think I know what's causing this, but I don't know how to resolve it I pushed this PR onto - my Personal CI, where it passed without problems. - onto Flink's CI, where it passed without problems. - I manually restarted this PRs CI, and it broke again. The only difference between all these runs is that the broken PR CI run is downloading data from the maven cache on Azure (e.g. the local .m2 directory contains stuff in the failure case). @galenwarren Could you do me a favor and add a "-U" argument here: https://github.com/apache/flink/blob/master/tools/ci/maven-utils.sh#L107. This COULD resolve the issue. > This can be confirmed from the maven log, where no "Downloading xxx" can be found until the failure. The reason for the missing log statements is that we've disabled the download messages from maven: https://github.com/apache/flink/blob/master/tools/ci/maven-utils.sh#L103 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] todd5167 commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…
todd5167 commented on a change in pull request #15755: URL: https://github.com/apache/flink/pull/15755#discussion_r620879361 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java ## @@ -144,6 +149,116 @@ public static Operation convertChangeColumn( // TODO: handle watermark and constraints } +public static Operation convertRenameColumn( +ObjectIdentifier tableIdentifier, +String originColumnName, +String newColumnName, +CatalogTable catalogTable) { + +Schema modifiedTableSchema = catalogTable.getUnresolvedSchema(); +validateColumnName(originColumnName, newColumnName, modifiedTableSchema); + +Schema.Builder builder = Schema.newBuilder(); +// build column +modifiedTableSchema.getColumns().stream() +.forEach( +column -> { +if (StringUtils.equals(column.getName(), originColumnName)) { +buildNewColumnFromOriginColumn(builder, column, newColumnName); +} else { +buildNewColumnFromOriginColumn(builder, column, column.getName()); +} +}); +// build primary key column +List originPrimaryKeyNames = +modifiedTableSchema +.getPrimaryKey() +.map(Schema.UnresolvedPrimaryKey::getColumnNames) +.orElseGet(Lists::newArrayList); + +List newPrimaryKeyNames = +originPrimaryKeyNames.stream() +.map( +pkName -> +StringUtils.equals(pkName, originColumnName) +? newColumnName +: pkName) +.collect(Collectors.toList()); + +if (newPrimaryKeyNames.size() > 0) { +builder.primaryKey(newPrimaryKeyNames); +} +// build watermark +modifiedTableSchema.getWatermarkSpecs().stream() +.forEach( +watermarkSpec -> { +String watermarkRefColumnName = watermarkSpec.getColumnName(); +Expression watermarkExpression = watermarkSpec.getWatermarkExpression(); +if (StringUtils.equals(watermarkRefColumnName, originColumnName)) { +String newWatermarkExpression = +((SqlCallExpression) watermarkExpression) +.getSqlExpression() + .replace(watermarkRefColumnName, newColumnName); Review comment: OK. Computed expression and Watermark expression disallow rename in the first version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22485) Support client attach on application mode
[ https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332939#comment-17332939 ] Jark Wu commented on FLINK-22485: - Sorry, [~zuston], I misunderstand your problem. Flink CLI already supports attached mode, you can refer: https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-attached > Support client attach on application mode > - > > Key: FLINK-22485 > URL: https://issues.apache.org/jira/browse/FLINK-22485 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Junfan Zhang >Priority: Major > > Now, client will not wait until job finish when using application mode. > Can we support client attach on application mode? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22485) Support client attach on application mode
[ https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332939#comment-17332939 ] Jark Wu edited comment on FLINK-22485 at 4/27/21, 5:35 AM: --- Sorry, [~zuston], I misunderstood your problem. Flink CLI already supports attached mode, you can refer: https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-attached was (Author: jark): Sorry, [~zuston], I misunderstand your problem. Flink CLI already supports attached mode, you can refer: https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-attached > Support client attach on application mode > - > > Key: FLINK-22485 > URL: https://issues.apache.org/jira/browse/FLINK-22485 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Junfan Zhang >Priority: Major > > Now, client will not wait until job finish when using application mode. > Can we support client attach on application mode? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22294) Hive reading fail when getting file numbers on different filesystem nameservices
[ https://issues.apache.org/jira/browse/FLINK-22294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331442#comment-17331442 ] Rui Li edited comment on FLINK-22294 at 4/27/21, 5:33 AM: -- Fixed in master: 6071f9686d2a7bfb154b1e7b1682b2cfee190922 Fixed in release-1.13: 7a4a18da90521fe3c0758d4579441c50907f368b was (Author: lirui): Fixed in master: 6071f9686d2a7bfb154b1e7b1682b2cfee190922 Fixed in release-1.13: TBD > Hive reading fail when getting file numbers on different filesystem > nameservices > > > Key: FLINK-22294 > URL: https://issues.apache.org/jira/browse/FLINK-22294 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.12.2 >Reporter: Junfan Zhang >Priority: Major > Labels: pull-request-available > > The same problem like https://issues.apache.org/jira/browse/FLINK-20710 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lirui-apache closed pull request #15750: [FLINK-22294][hive] Hive reading fail when getting file numbers on different filesystem nameservices
lirui-apache closed pull request #15750: URL: https://github.com/apache/flink/pull/15750 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22485) Support client attach on application mode
[ https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-22485: Component/s: (was: Table SQL / Client) Command Line Client > Support client attach on application mode > - > > Key: FLINK-22485 > URL: https://issues.apache.org/jira/browse/FLINK-22485 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Junfan Zhang >Priority: Major > > Now, client will not wait until job finish when using application mode. > Can we support client attach on application mode? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lirui-apache commented on pull request #15750: [FLINK-22294][hive] Hive reading fail when getting file numbers on different filesystem nameservices
lirui-apache commented on pull request #15750: URL: https://github.com/apache/flink/pull/15750#issuecomment-827324483 Pushed to release-1.13 via 7a4a18da90521fe3c0758d4579441c50907f368b -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a change in pull request #15720: [FLINK-22289] Update JDBC XA sink docs
curcur commented on a change in pull request #15720: URL: https://github.com/apache/flink/pull/15720#discussion_r620850140 ## File path: docs/content/docs/connectors/datastream/jdbc.md ## @@ -167,15 +177,24 @@ env ps.setDouble(4, t.price); ps.setInt(5, t.qty); }, -JdbcExecutionOptions.builder().build(), +JdbcExecutionOptions.builder() +.withMaxRetries(0) +.build(), JdbcExactlyOnceOptions.defaults(), () -> { // create a driver-specific XA DataSource +// The following example is for derby EmbeddedXADataSource ds = new EmbeddedXADataSource(); ds.setDatabaseName("my_db"); return ds; }); env.execute(); ``` +Postgres XADataSource Example: +```java +PGXADataSource pgxaDataSource = new PGXADataSource(); +pgxaDataSource.setUrl( +"jdbc:postgresql://localhost:5432/postgres"); Review comment: Yes, usually it should. My intention for this example is to make this example "runnable out of box"; That says if user copy-paste the example to its IDE, and has postgres installed with default set up, it would work. I installed postgre and used its default table as an example. The default one can be accessed without username and password (with default username and password). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-22092) HiveConf can cache hiveSiteURL from classpath and cause FileNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-22092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Li resolved FLINK-22092. Resolution: Fixed > HiveConf can cache hiveSiteURL from classpath and cause FileNotFoundException > - > > Key: FLINK-22092 > URL: https://issues.apache.org/jira/browse/FLINK-22092 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: Rui Li >Assignee: Rui Li >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.13.0 > > > It turns out that FLINK-19702 is incomplete and {{HiveConf}} may still > automatically load hive-site from classpath and set {{hiveSiteURL}} variable. > This can cause problems, e.g. create a HiveCatalog reading hive-site from > classpath, drop this catalog and also remove the hive-site file, create > another HiveCatalog with hive-conf-dir pointing to another location, the 2nd > HiveCatalog cannot be created because {{HiveConf}} has remembered the > hive-site location from the previous one and complains the file can no longer > be found. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-web] loniecc commented on pull request #386: [FLINK-13679] Translate "Code Style - Preamble" page into Chinese
loniecc commented on pull request #386: URL: https://github.com/apache/flink-web/pull/386#issuecomment-827322217 > @loniecc thanks for the contribution, will give it a review soon. thx,have reslove all , please review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] loniecc commented on a change in pull request #386: [FLINK-13679] Translate "Code Style - Preamble" page into Chinese
loniecc commented on a change in pull request #386: URL: https://github.com/apache/flink-web/pull/386#discussion_r620874828 ## File path: contributing/code-style-and-quality-preamble.zh.md ## @@ -1,25 +1,23 @@ --- -title: "Apache Flink Code Style and Quality Guide — Preamble" +title: "Apache Flink 代码风格和质量指南 — 序言" --- {% include code-style-navbar.zh.md %} +这是一次为了保证那些被我们维护的代码和质量标准的尝试 -This is an attempt to capture the code and quality standard that we want to maintain. +一次代码贡献(或者任何代码片段)可以从很多角度进行评价:一组评判标准是代码是否正确和高效。这需要正确且良好的解决逻辑或者算法问题。 -A code contribution (or any piece of code) can be evaluated in various ways: One set of properties is whether the code is correct and efficient. This requires solving the _logical or algorithmic problem_ correctly and well. +另一组评判标准是代码是否使用了简洁的设计和架构,不管是通过概念分割实现了良好的结构,还是使用了简单易懂的代码。该评判标准需要良好的解决软件工程问题。一个好的解决方案需要代码是容易被测试的,可以被除了原作者之外的其他人维护的(因为突然中断之后再维护是非常困难的),同时还需要能够高效的迭代演进的。 Review comment: 嗯,重新捋了一下,确实是 “破坏规范” 更好一些 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] loniecc commented on a change in pull request #386: [FLINK-13679] Translate "Code Style - Preamble" page into Chinese
loniecc commented on a change in pull request #386: URL: https://github.com/apache/flink-web/pull/386#discussion_r620874392 ## File path: contributing/code-style-and-quality-preamble.zh.md ## @@ -1,25 +1,23 @@ --- -title: "Apache Flink Code Style and Quality Guide — Preamble" +title: "Apache Flink 代码风格和质量指南 — 序言" --- {% include code-style-navbar.zh.md %} +这是一次为了保证那些被我们维护的代码和质量标准的尝试 -This is an attempt to capture the code and quality standard that we want to maintain. +一次代码贡献(或者任何代码片段)可以从很多角度进行评价:一组评判标准是代码是否正确和高效。这需要正确且良好的解决逻辑或者算法问题。 -A code contribution (or any piece of code) can be evaluated in various ways: One set of properties is whether the code is correct and efficient. This requires solving the _logical or algorithmic problem_ correctly and well. +另一组评判标准是代码是否使用了简洁的设计和架构,不管是通过概念分割实现了良好的结构,还是使用了简单易懂的代码。该评判标准需要良好的解决软件工程问题。一个好的解决方案需要代码是容易被测试的,可以被除了原作者之外的其他人维护的(因为突然中断之后再维护是非常困难的),同时还需要能够高效的迭代演进的。 -Another set of properties is whether the code follows an intuitive design and architecture, whether it is well structured with right separation of concerns, and whether the code is easily understandable and makes its assumptions explicit. That set of properties requires solving the _software engineering problem_ well. A good solution implies that the code is easily testable, maintainable also by other people than the original authors (because it is harder to accidentally break), and efficient to evolve. +不过第一组标准有相当客观的达成条件,相比之下要达到第二组评判标准更加困难,但是对于 Apache Flink 这样的开源项目来说却非常重要。为了基础的代码能够邀请到更多开发者,为了的开源贡献能够更容易被开发者理解,同时也为了众多开发者同时开发时代码的健壮性,良好工程化的代码是至关重要的。对于良好的工程代码来说,更加容易保证代码的正确性和高效不会随着时间的推移受到影响 -While the first set of properties has rather objective approval criteria, the second set of properties is much harder to assess, but is of high importance for an open source project like Apache Flink. To make the code base inviting to many contributors, to make contributions easy to understand for developers that did not write the original code, and to make the code robust in the face of many contributions, well engineered code is crucial.[^1] For well engineered code, it is easier to keep it correct and fast over time. +当然,本指南并不是一份如何写出良好的工程代码的全方位指导。有相当多的书籍尝试说明如何实现良好的代码。本指南仅仅尝试作为最佳实践的检查单,因为模式,反模式和常见错误我们都在开发Flink的时候遇见过。 Review comment: 确实,已修改 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-22485) Support client attach on application mode
[ https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332935#comment-17332935 ] Junfan Zhang edited comment on FLINK-22485 at 4/27/21, 5:22 AM: [~jark]Thanks for your reply. Attach detailed info version: 1.12.1 Component: Flink cli execution mode: Flink batch I want to know whether the appliation mode supports {{attach}} in flink cli, I think it is not related to sql client was (Author: zuston): Attach detailed info version: 1.12.1 Component: Flink cli execution mode: Flink batch I want to know whether the appliation mode supports {{attach}} in flink cli, I think it is not related to sql client > Support client attach on application mode > - > > Key: FLINK-22485 > URL: https://issues.apache.org/jira/browse/FLINK-22485 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Reporter: Junfan Zhang >Priority: Major > > Now, client will not wait until job finish when using application mode. > Can we support client attach on application mode? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22485) Support client attach on application mode
[ https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332935#comment-17332935 ] Junfan Zhang commented on FLINK-22485: -- Attach detailed info version: 1.12.1 Component: Flink cli execution mode: Flink batch I want to know whether the appliation mode supports {{attach}} in flink cli, I think it is not related to sql client > Support client attach on application mode > - > > Key: FLINK-22485 > URL: https://issues.apache.org/jira/browse/FLINK-22485 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Reporter: Junfan Zhang >Priority: Major > > Now, client will not wait until job finish when using application mode. > Can we support client attach on application mode? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wangwei1025 commented on a change in pull request #15746: [FLINK-17783][Table SQL/Ecosystem]Add array,map,row types support for orc row writer
wangwei1025 commented on a change in pull request #15746: URL: https://github.com/apache/flink/pull/15746#discussion_r620868599 ## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java ## @@ -151,8 +160,114 @@ private static void setColumn( vector.set(rowId, timestamp); break; } +case ARRAY: +{ +ListColumnVector listColumnVector = (ListColumnVector) column; +setColumn(rowId, listColumnVector, type, row, columnId); +break; +} +case MAP: +{ +MapColumnVector mapColumnVector = (MapColumnVector) column; +setColumn(rowId, mapColumnVector, type, row, columnId); +break; +} +case ROW: +{ +StructColumnVector structColumnVector = (StructColumnVector) column; +setColumn(rowId, structColumnVector, type, row, columnId); +break; +} default: throw new UnsupportedOperationException("Unsupported type: " + type); } } + +private static void setColumn( +int rowId, +ListColumnVector listColumnVector, +LogicalType type, +RowData row, +int columnId) { +ArrayData arrayData = row.getArray(columnId); +ArrayType arrayType = (ArrayType) type; +listColumnVector.lengths[rowId] = arrayData.size(); +listColumnVector.offsets[rowId] = listColumnVector.childCount; +listColumnVector.childCount += listColumnVector.lengths[rowId]; +listColumnVector.child.ensureSize( +listColumnVector.childCount, listColumnVector.offsets[rowId] != 0); +for (int i = 0; i < arrayData.size(); i++) { +setColumn( +(int) listColumnVector.offsets[rowId] + i, +listColumnVector.child, +arrayType.getElementType(), +convert(arrayData, arrayType.getElementType()), Review comment: Yes, it's uncessary. This happend in setColumn(int,MapColumnVector,LogicalType,RowData,int) too. I fix them in this latest commit [hotfix]move convert out of loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15745: [FLINK-22304][table] Refactor some interfaces for TVF based window to…
flinkbot edited a comment on pull request #15745: URL: https://github.com/apache/flink/pull/15745#issuecomment-826249839 ## CI report: * 11130e794b07aa9215035e5c204766295a4f50fe Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17258) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FuyaoLi2017 commented on pull request #15602: [FLINK-22264][docs] Fix misleading statement about Flink Job Cluster Kubernetes Support in Flink Architecture page
FuyaoLi2017 commented on pull request #15602: URL: https://github.com/apache/flink/pull/15602#issuecomment-827307615 @tillrohrmann Hello Till, just have one small question, why the committer username of this PR appears as fuyli instead of @FuyaoLi2017 ? fuyli is my alias for my company's laptop, hhh. A little bit confused here. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a change in pull request #15720: [FLINK-22289] Update JDBC XA sink docs
curcur commented on a change in pull request #15720: URL: https://github.com/apache/flink/pull/15720#discussion_r620850140 ## File path: docs/content/docs/connectors/datastream/jdbc.md ## @@ -167,15 +177,24 @@ env ps.setDouble(4, t.price); ps.setInt(5, t.qty); }, -JdbcExecutionOptions.builder().build(), +JdbcExecutionOptions.builder() +.withMaxRetries(0) +.build(), JdbcExactlyOnceOptions.defaults(), () -> { // create a driver-specific XA DataSource +// The following example is for derby EmbeddedXADataSource ds = new EmbeddedXADataSource(); ds.setDatabaseName("my_db"); return ds; }); env.execute(); ``` +Postgres XADataSource Example: +```java +PGXADataSource pgxaDataSource = new PGXADataSource(); +pgxaDataSource.setUrl( +"jdbc:postgresql://localhost:5432/postgres"); Review comment: Yes, usually it should. My intention for this example is to make this example "runnable out of box"; That says if user copy-paste the example to its IDE, and has postgres installed with default set up, it would work. I installed postgre and `postgres` is the default table can be accessed without username and password or default (username and password). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a change in pull request #15720: [FLINK-22289] Update JDBC XA sink docs
curcur commented on a change in pull request #15720: URL: https://github.com/apache/flink/pull/15720#discussion_r620846546 ## File path: docs/content/docs/connectors/datastream/jdbc.md ## @@ -32,8 +32,15 @@ To use it, add the following dependency to your project (along with your JDBC dr {{< artifact flink-connector-jdbc withScalaVersion >}} -Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}). +A driver dependency is also required to connect to a specified database. Here are drivers currently supported: + +| Driver | Group Id | Artifact Id | JAR | +| :---| :--| :--| :| +| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) | +| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download.html) | Review comment: It is using the same link as https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ So I would only say this link does not include extra legal issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on a change in pull request #15720: [FLINK-22289] Update JDBC XA sink docs
curcur commented on a change in pull request #15720: URL: https://github.com/apache/flink/pull/15720#discussion_r620846546 ## File path: docs/content/docs/connectors/datastream/jdbc.md ## @@ -32,8 +32,15 @@ To use it, add the following dependency to your project (along with your JDBC dr {{< artifact flink-connector-jdbc withScalaVersion >}} -Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}). +A driver dependency is also required to connect to a specified database. Here are drivers currently supported: + +| Driver | Group Id | Artifact Id | JAR | +| :---| :--| :--| :| +| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) | +| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download.html) | Review comment: It is using the same link as https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ So my take is `yes` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #15720: [FLINK-22289] Update JDBC XA sink docs
curcur commented on pull request #15720: URL: https://github.com/apache/flink/pull/15720#issuecomment-827302091 > Thanks for the PR @curcur. > > Should we also remove this warning: > > > Attention: In 1.13, Flink JDBC sink does not support exactly-once mode with MySQL or other databases that do not support > > multiple XA transaction per connection. We will improve the support in FLINK-22239. > > ? That's a good point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
xintongsong commented on pull request #15599: URL: https://github.com/apache/flink/pull/15599#issuecomment-827297699 @rmetzger, thanks for help looking into this. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
xintongsong commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r620837563 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + +/** The underlying blob storage. */ +private final BlobStorage storage; + +/** The GS file system options. */ +private final GSFileSystemOptions options; + +/** The recoverable writer instance. */ +private final GSRecoverableWriter writer; + +/** The recoverable writer state for the commit operation. */ +private final GSRecoverableWriterState state; + +GSRecoverableWriterCommitter( +BlobStorage storage, +GSFileSystemOptions options, +GSRecoverableWriter writer, +GSRecoverableWriterState state) { +this.storage = Preconditions.checkNotNull(storage); +this.options = Preconditions.checkNotNull(options); +this.writer = Preconditions.checkNotNull(writer); +this.state = Preconditions.checkNotNull(state); +} + +@Override +public void commit() throws IOException { + +// compose all the component blob ids into the final blob id. if the component blob ids are +// in the same bucket as the final blob id, this can be done directly. otherwise, we must +// compose to a new temporary blob id in the same bucket as the component blob ids and +// then copy that blob to the final blob location +if (state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) { + +// compose directly to final blob +composeBlobs( +state.getComponentBlobIds(options), +state.finalBlobId, +options.writerContentType); + +} else { + +// compose to a temporary blob id, then copy to final blob id +BlobId intermediateBlobId = state.createTemporaryBlobId(options); +composeBlobs( +state.getComponentBlobIds(options), +intermediateBlobId, +options.writerContentType); +storage.copy(intermediateBlobId, state.finalBlobId); +} + +// clean up after commit +writer.cleanupRecoverableState(state); Review comment: On second thought, if we cannot clean anything in `cleanupRecoverableState`, it might be our only chance to clean the temporary blobs on committing. This might have a higher priority than supporting manually recover from an early checkpoint. I think the following issues are closely related and really need to be clarified consistently. - What are the relationships between snapshots (resumables) and temporary blobs - Actions for `cleanupRecoverableState` - Actions for `commit` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22487) Support `print` to print logs in PyFlink
[ https://issues.apache.org/jira/browse/FLINK-22487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-22487: - Description: Currently, if users want to print logs, they need to use logging module. {code:python} @udf(result_type=DataTypes.BIGINT()) def add(i, j): import logging logging.info("debug") return i + j {code} It will be more convenient to use `print` to print logs. {code: python} @udf(result_type=DataTypes.BIGINT()) def add(i, j): print("debug") return i + j {code} was: Currently, if users want to print logs, they need to use logging module. {code:python} @udf(result_type=DataTypes.BIGINT()) def add(i, j): import logging logging.info("debug") return i + j {code} It will be more convenient to use `print` to print logs. {code: python} @udf(result_type=DataTypes.BIGINT()) def add(i, j): print("debug") return i + j {code} > Support `print` to print logs in PyFlink > > > Key: FLINK-22487 > URL: https://issues.apache.org/jira/browse/FLINK-22487 > Project: Flink > Issue Type: New Feature > Components: API / Python >Affects Versions: 1.14.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Fix For: 1.14.0 > > > Currently, if users want to print logs, they need to use logging module. > {code:python} > @udf(result_type=DataTypes.BIGINT()) > def add(i, j): > import logging > logging.info("debug") > return i + j > {code} > It will be more convenient to use `print` to print logs. > {code: python} > @udf(result_type=DataTypes.BIGINT()) > def add(i, j): > print("debug") > return i + j > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lirui-apache commented on a change in pull request #15746: [FLINK-17783][Table SQL/Ecosystem]Add array,map,row types support for orc row writer
lirui-apache commented on a change in pull request #15746: URL: https://github.com/apache/flink/pull/15746#discussion_r620826305 ## File path: flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java ## @@ -151,8 +160,114 @@ private static void setColumn( vector.set(rowId, timestamp); break; } +case ARRAY: +{ +ListColumnVector listColumnVector = (ListColumnVector) column; +setColumn(rowId, listColumnVector, type, row, columnId); +break; +} +case MAP: +{ +MapColumnVector mapColumnVector = (MapColumnVector) column; +setColumn(rowId, mapColumnVector, type, row, columnId); +break; +} +case ROW: +{ +StructColumnVector structColumnVector = (StructColumnVector) column; +setColumn(rowId, structColumnVector, type, row, columnId); +break; +} default: throw new UnsupportedOperationException("Unsupported type: " + type); } } + +private static void setColumn( +int rowId, +ListColumnVector listColumnVector, +LogicalType type, +RowData row, +int columnId) { +ArrayData arrayData = row.getArray(columnId); +ArrayType arrayType = (ArrayType) type; +listColumnVector.lengths[rowId] = arrayData.size(); +listColumnVector.offsets[rowId] = listColumnVector.childCount; +listColumnVector.childCount += listColumnVector.lengths[rowId]; +listColumnVector.child.ensureSize( +listColumnVector.childCount, listColumnVector.offsets[rowId] != 0); +for (int i = 0; i < arrayData.size(); i++) { +setColumn( +(int) listColumnVector.offsets[rowId] + i, +listColumnVector.child, +arrayType.getElementType(), +convert(arrayData, arrayType.getElementType()), Review comment: Move this out of the loop so that we don't do this for each field? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-22487) Support `print` to print logs in PyFlink
Huang Xingbo created FLINK-22487: Summary: Support `print` to print logs in PyFlink Key: FLINK-22487 URL: https://issues.apache.org/jira/browse/FLINK-22487 Project: Flink Issue Type: New Feature Components: API / Python Affects Versions: 1.14.0 Reporter: Huang Xingbo Assignee: Huang Xingbo Fix For: 1.14.0 Currently, if users want to print logs, they need to use logging module. {code:python} @udf(result_type=DataTypes.BIGINT()) def add(i, j): import logging logging.info("debug") return i + j {code} It will be more convenient to use `print` to print logs. {code: python} @udf(result_type=DataTypes.BIGINT()) def add(i, j): print("debug") return i + j {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22485) Support client attach on application mode
[ https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332912#comment-17332912 ] Jark Wu commented on FLINK-22485: - Which version are you using [~zuston]? We just supported sync/async mode in the 1.13 release, could you try the 1.13 rc2 [1]? And here is the documentation: https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-dml-statements-syncasync [1]: https://dist.apache.org/repos/dist/dev/flink/flink-1.13.0-rc2/ > Support client attach on application mode > - > > Key: FLINK-22485 > URL: https://issues.apache.org/jira/browse/FLINK-22485 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Reporter: Junfan Zhang >Priority: Major > > Now, client will not wait until job finish when using application mode. > Can we support client attach on application mode? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22485) Support client attach on application mode
[ https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-22485: Component/s: Table SQL / Client > Support client attach on application mode > - > > Key: FLINK-22485 > URL: https://issues.apache.org/jira/browse/FLINK-22485 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Reporter: Junfan Zhang >Priority: Major > > Now, client will not wait until job finish when using application mode. > Can we support client attach on application mode? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…
wuchong commented on a change in pull request #15755: URL: https://github.com/apache/flink/pull/15755#discussion_r620825538 ## File path: flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl ## @@ -478,6 +481,17 @@ SqlAlterTable SqlAlterTable() : tableIdentifier, newTableIdentifier); } +| + +originColumnName = SimpleIdentifier() + +newColumnName = SimpleIdentifier() +{ +return new SqlAlterTableRenameColumn( +startPos.plus(getPos()), +tableIdentifier, +originColumnName,newColumnName); Review comment: nit: Please make them in separate lines. ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java ## @@ -144,6 +149,116 @@ public static Operation convertChangeColumn( // TODO: handle watermark and constraints } +public static Operation convertRenameColumn( +ObjectIdentifier tableIdentifier, +String originColumnName, +String newColumnName, +CatalogTable catalogTable) { + +Schema modifiedTableSchema = catalogTable.getUnresolvedSchema(); Review comment: `modifiedTableSchema` sounds like this schema has been modified, would be better to call `originSchema`. ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java ## @@ -144,6 +149,116 @@ public static Operation convertChangeColumn( // TODO: handle watermark and constraints } +public static Operation convertRenameColumn( +ObjectIdentifier tableIdentifier, +String originColumnName, +String newColumnName, +CatalogTable catalogTable) { + +Schema modifiedTableSchema = catalogTable.getUnresolvedSchema(); +validateColumnName(originColumnName, newColumnName, modifiedTableSchema); + +Schema.Builder builder = Schema.newBuilder(); +// build column +modifiedTableSchema.getColumns().stream() +.forEach( +column -> { +if (StringUtils.equals(column.getName(), originColumnName)) { +buildNewColumnFromOriginColumn(builder, column, newColumnName); +} else { +buildNewColumnFromOriginColumn(builder, column, column.getName()); +} +}); +// build primary key column +List originPrimaryKeyNames = +modifiedTableSchema +.getPrimaryKey() +.map(Schema.UnresolvedPrimaryKey::getColumnNames) +.orElseGet(Lists::newArrayList); + +List newPrimaryKeyNames = +originPrimaryKeyNames.stream() +.map( +pkName -> +StringUtils.equals(pkName, originColumnName) +? newColumnName +: pkName) +.collect(Collectors.toList()); + +if (newPrimaryKeyNames.size() > 0) { +builder.primaryKey(newPrimaryKeyNames); +} +// build watermark +modifiedTableSchema.getWatermarkSpecs().stream() +.forEach( +watermarkSpec -> { +String watermarkRefColumnName = watermarkSpec.getColumnName(); +Expression watermarkExpression = watermarkSpec.getWatermarkExpression(); +if (StringUtils.equals(watermarkRefColumnName, originColumnName)) { +String newWatermarkExpression = +((SqlCallExpression) watermarkExpression) +.getSqlExpression() + .replace(watermarkRefColumnName, newColumnName); Review comment: 1. We can't guarantee this is always `SqlCallExpression`. 2. We can't use String `replace` for renaming, this is very error-prone, e.g. the original expressions is `f123 - f1` and `f1` is rename to `f2`, then the replaced result would be `f223 - f2` which is wrong. If we don't have a good way to replace column names in expression, we can disallow rename for columns referenced in expressions in the first version. ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java ## @@ -144,6 +149,116 @@ public static Operation convertChangeColumn( //
[jira] [Comment Edited] (FLINK-22408) Flink Table Parsr Hive Drop Partitions Syntax unparse is Error
[ https://issues.apache.org/jira/browse/FLINK-22408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332910#comment-17332910 ] Ma Jun edited comment on FLINK-22408 at 4/27/21, 3:54 AM: -- Hi [~lirui] Thank!I got it! I know that if there is a subsequent release of version 1.13, I will switch the version of the existing business. I have a business requirement here: I use the module SQL translation in Flink to do some front-end SQL verification and formatting, because I found that the syntax of hive is inconsistent when converting dialects. There may be misunderstandings among users. So I think we should keep the syntax consistent with hive. was (Author: aidenma): Hi [~lirui] Thank!I got it! I know that if there is a subsequent release of version 1.13, I will switch the version of the existing business. I have a business requirement here: I use the module SQL translation in Flink to do some front-end SQL verification and formatting, because I found that the syntax of hive is inconsistent when converting dialects. So I think we should keep the syntax consistent with hive. > Flink Table Parsr Hive Drop Partitions Syntax unparse is Error > -- > > Key: FLINK-22408 > URL: https://issues.apache.org/jira/browse/FLINK-22408 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.11.3 >Reporter: Ma Jun >Priority: Major > Labels: pull-request-available > > Flink Table Parser is error: > *Synopsis:* > > *SQL:* > {code:java} > alter table tbl drop partition (p1='a',p2=1), partition(p1='b',p2=2);{code} > *hive muit partition unparse toSqlString is :* > {code:java} > ALTER TABLE `TBL`\n" + > "DROP\n" + > "PARTITION (`P1` = 'a', `P2` = 1)\n" + > "PARTITION (`P1` = 'b', `P2` = 2) > {code} > Missing comma in Partition SqlNodeList > Hive syntax: > {code:java} > ALTER TABLE table_name DROP [IF EXISTS] PARTITION (partition_spec) [, > PARTITION (partition_spec)]; > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15131: [FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn
flinkbot edited a comment on pull request #15131: URL: https://github.com/apache/flink/pull/15131#issuecomment-794957907 ## CI report: * 0500d84f78d8fdb967b077fb024fda0cd4ba2d6a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15682) * b99e1a12c74051454d211b0094c986b8741dd28f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17261) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample
flinkbot edited a comment on pull request #15121: URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240 ## CI report: * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN * 76bbb8de288e2990974fe9ea7a09fb47e37daad8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17220) * fbd38b53197c244af5ed3af54cebdee75b393a1d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17257) * b7dca63a2e98f09c8c780a21091b5268d897b220 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22408) Flink Table Parsr Hive Drop Partitions Syntax unparse is Error
[ https://issues.apache.org/jira/browse/FLINK-22408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332910#comment-17332910 ] Ma Jun commented on FLINK-22408: Hi [~lirui] Thank!I got it! I know that if there is a subsequent release of version 1.13, I will switch the version of the existing business. I have a business requirement here: I use the module SQL translation in Flink to do some front-end SQL verification and formatting, because I found that the syntax of hive is inconsistent when converting dialects. So I think we should keep the syntax consistent with hive. > Flink Table Parsr Hive Drop Partitions Syntax unparse is Error > -- > > Key: FLINK-22408 > URL: https://issues.apache.org/jira/browse/FLINK-22408 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.11.3 >Reporter: Ma Jun >Priority: Major > Labels: pull-request-available > > Flink Table Parser is error: > *Synopsis:* > > *SQL:* > {code:java} > alter table tbl drop partition (p1='a',p2=1), partition(p1='b',p2=2);{code} > *hive muit partition unparse toSqlString is :* > {code:java} > ALTER TABLE `TBL`\n" + > "DROP\n" + > "PARTITION (`P1` = 'a', `P2` = 1)\n" + > "PARTITION (`P1` = 'b', `P2` = 2) > {code} > Missing comma in Partition SqlNodeList > Hive syntax: > {code:java} > ALTER TABLE table_name DROP [IF EXISTS] PARTITION (partition_spec) [, > PARTITION (partition_spec)]; > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-21513) Rethink up-/down-/restartingTime metrics
[ https://issues.apache.org/jira/browse/FLINK-21513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17329947#comment-17329947 ] Steven Zhen Wu edited comment on FLINK-21513 at 4/27/21, 3:41 AM: -- [~trohrmann] thanks for tagging me here. Yeah, availability can be a little tricky concept for Flink application. Users typically ask what is the availability / uptime for my Flink application (e.g. 4 nines). For micro services, availability can be measured as success/failure rate in a naive way (there are more sophisticated and probably accurate ways). How do we define availability for Flink? Current uptime metric doesn't capture availability. Also availability probably can be captured in different time scales (last hour, last 4 hours, last 12 hours, last 24 hours, last week etc.). was (Author: stevenz3wu): [~trohrmann] thanks for tagging me here. Yeah, availability can be a little tricky concept for Flink application. Users typically ask what is the availability / uptime for my Flink application (4 nines). E.g., for micro services, availability can be measured as success/failure rate in a naive way (there are more sophisticated and probably accurate ways). How do we define availability for Flink? uptime doesn't capture availability. Also availability probably can be captured in different time scales (last hour, last 4 hours, last 12 hours, last 24 hours, last week etc.). > Rethink up-/down-/restartingTime metrics > > > Key: FLINK-21513 > URL: https://issues.apache.org/jira/browse/FLINK-21513 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / Metrics >Reporter: Chesnay Schepler >Priority: Major > Labels: stale-major > Fix For: 1.13.0 > > > While thinking about FLINK-21510 I stumbled upon some issues in the the > semantics of these metrics, both from a user perspective and from our own, > and I think we need to clarify some things. > h4. upTime > This metric describes the time since the job transitioned RUNNING state. > It is meant as a measure for how stably a deployment is. > In the default scheduler this transitions happens before we do any actual > scheduling work, and as a result this also includes the time it takes for the > JM to request slots and deploy tasks. In practive this means we start the > timer once the job has been submitted and the JobMaster/Scheduler/EG have > been initialized. > For the adaptive scheduler this now puts us a bit into an odd situation > because it first acquires slots before actually transitioning the EG into a > RUNNING state, so as is we'd end up measuring 2 slightly different things. > The question now is whether this is a problem. > While we could certainly stick with the definition of "time since EG switched > to RUNNING", it raises the question what the semantics of this metric are > should a scheduler use a different data-structure than the EG. > In other words, what I'm looking for is a definition that is independent from > existing data-structures; a crude example could be "The time since the job is > in a state where the deployment of a task is possible.". > An alternative for the adaptive scheduler would be to measure the time since > we transitioned to WaitingForResources, with which we would also include the > slot acquisition, but it would be inconsistent with the logs and UI (because > they only display an INITIALIZING job). > h4. restartingTime > This metric describes the time since the job transitioned into a RESTARTING > state. > It is meant as a measure for how long the recovery in case of a job failure > takes. > In the default scheduler this in practice is the time between a failure > arriving at the JM and the cancellation of tasks being completed / restart > backoff (whichever is higher). > This is consistent with the semantics of the upTime metric, because upTime > also includes the time required for acquiring slots and deploying tasks. > For the adaptive scheduler we can follow similar semantics, by measuring the > time we spend in the {{Restarting}} state. > However, if we stick to the definition of upTime as time spent in RUNNING, > then we will end up with a gap for the time spent in WaitingForResources. > h4. downTime > This metric describes the time between the job transitioning from FAILING to > RUNNING. > It is meant as a measure for how long the recovery in case of a job failure > takes. > You may be wondering what the difference between {{downTime}} and > {{restartingTime}} is meant to be. Unfortunately I do not have the answer to > that. > Presumably, at the time they were added, they were covering different parts > of the recovery process, but since we never documented these steps explicitly > the exact semantics are
[GitHub] [flink] ZhangChaoming removed a comment on pull request #15749: [hotfix][docs] Fix typo.
ZhangChaoming removed a comment on pull request #15749: URL: https://github.com/apache/flink/pull/15749#issuecomment-826467604 @rmetzger Excuse me? The pipeline failed for the reason that `table.local-time-zone` does not exist. But I did not modify any about this option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
xintongsong commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r620812109 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + +private static final String SCHEME = "gs"; + +private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + +private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + +private static final String[][] MIRRORED_CONFIG_KEYS = {}; + +private static final String FLINK_SHADING_PREFIX = ""; + +public static final ConfigOption WRITER_TEMPORARY_BUCKET_NAME = +ConfigOptions.key("gs.writer.temporary.bucket.name") +.stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME) +.withDescription( +"This option sets the bucket name used by the recoverable writer to store temporary files. " ++ "If empty, temporary files are stored in the same bucket as the final file being written."); + +public static final ConfigOption WRITER_TEMPORARY_OBJECT_PREFIX = +ConfigOptions.key("gs.writer.temporary.object.prefix") +.stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX) +.withDescription( +"This option sets the prefix used by the recoverable writer when writing temporary files. This prefix is applied to the " ++ "final object name to form the base name for temporary files."); Review comment: I'd be in favor of not introducing this option if it's not absolutely necessary. The more options we provide to the users, the more constraints we put on future developing and maintaining. As for including the bucket names in temporary blob names, I think that's a good idea. Since the temporary blobs are not meant to be manipulated by users directly, it would be nice to carry more information with the blob names. Moreover, I wonder whether we should also include the index of temporary blobs, in addition to the UUID. That might be useful for cleaning up the resumables. If in future we want to combine the temporary blobs in advance instead of all in the last, it might be helpful to understand a blob with name `xxx-1-32-xxx` contains all the content from `xxx-1-1-xxx` to `xxx-32-32-xxx`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15763: [FLINK-22364][doc] Translate the page of "Data Sources" to Chinese.
flinkbot edited a comment on pull request #15763: URL: https://github.com/apache/flink/pull/15763#issuecomment-826589720 ## CI report: * 03b1c6bc999ee92a23eb0a0b84a368f2643e841d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17201) * 0dcb773365ec16118d2b4c0f55910441bc6d7d58 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17259) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22460) Conversion to relational algebra failed due to NOT NULL modifier
[ https://issues.apache.org/jira/browse/FLINK-22460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332902#comment-17332902 ] Shengkai Fang commented on FLINK-22460: --- It seems the input schema is not as same as the sink schema. You can specify the type of the column `number` in the table `table` is `BIGINT NOT NULL`. > Conversion to relational algebra failed due to NOT NULL modifier > > > Key: FLINK-22460 > URL: https://issues.apache.org/jira/browse/FLINK-22460 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.1 >Reporter: Haiwei Zhou >Priority: Major > > Flink complains that an insert sql doesn't match the table schema. The > validated type is missing a "NOT NULL" modifier. > > > {code:java} > py4j.protocol.Py4JJavaError: An error occurred while calling o18.executeSql. > : java.lang.AssertionError: Conversion to relational algebra failed to > preserve datatypes: > validated type: > RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" request, CHAR(7) > CHARACTER SET "UTF-16LE" NOT NULL EXPR$1, BIGINT number, TIMESTAMP(3) > start_time, TIMESTAMP(3) end_time) NOT NULL > converted type: > RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" request, CHAR(7) > CHARACTER SET "UTF-16LE" NOT NULL EXPR$1, BIGINT NOT NULL number, > TIMESTAMP(3) start_time, TIMESTAMP(3) end_time) NOT > NULL{code} > > > {code:java} > table_env.execute_sql(''' > CREATE TABLE preload_stats ( > lineitems STRING, > itype STRING, > number BIGINT NOT NULL, > start_time TIMESTAMP(3), > end_time TIMESTAMP(3) > )''' > > table_env.execute_sql( > "SELECT request, 'request', number, start_time, end_time " > "FROM result_1 ").execute_insert('preload_stats') > {code} > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
xintongsong commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r620824056 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; + +/** The recoverable writer implementation for Google storage. */ +public class GSRecoverableWriter implements RecoverableWriter { + +/** The underlying blob storage. */ +private final BlobStorage storage; + +/** The GS file system options. */ +private final GSFileSystemOptions options; + +/** + * Construct a GS recoverable writer. + * + * @param storage The underlying blob storage instance + * @param options The GS file system options + */ +public GSRecoverableWriter(BlobStorage storage, GSFileSystemOptions options) { +this.storage = Preconditions.checkNotNull(storage); +this.options = Preconditions.checkNotNull(options); +} + +@Override +public boolean requiresCleanupOfRecoverableState() { +return true; +} + +@Override +public boolean supportsResume() { +return true; +} + +@Override +public RecoverableFsDataOutputStream open(Path path) throws IOException { +Preconditions.checkNotNull(path); + +BlobId finalBlobId = BlobUtils.parseUri(path.toUri()); +GSRecoverableWriterState state = new GSRecoverableWriterState(finalBlobId); +return new GSRecoverableFsDataOutputStream(storage, options, this, state); +} + +@Override +public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException { +Preconditions.checkNotNull(resumable); + +GSRecoverableWriterState state = (GSRecoverableWriterState) resumable; +return new GSRecoverableFsDataOutputStream(storage, options, this, state); +} + +@Override +public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException { +Preconditions.checkNotNull(resumable); + +// determine the partial name for the temporary objects to be deleted +GSRecoverableWriterState state = (GSRecoverableWriterState) resumable; +String temporaryBucketName = state.getTemporaryBucketName(options); +String temporaryObjectPartialName = state.getTemporaryObjectPartialName(options); + +// this will hold the set of blob ids that were actually deleted +HashSet deletedBlobIds = new HashSet<>(); + +// find all the temp blobs by looking for anything that starts with the temporary +// object partial name. doing it this way finds any orphaned temp blobs that might +// have come about when resuming +List foundTempBlobIds = +storage.list(temporaryBucketName, temporaryObjectPartialName); +if (!foundTempBlobIds.isEmpty()) { + +// delete all the temp blobs, and populate the set with ones that were actually deleted +// normalize in case the blob came back with a generation populated +List deleteResults = storage.delete(foundTempBlobIds); +for (int i = 0; i < deleteResults.size(); i++) { +if (deleteResults.get(i)) { + deletedBlobIds.add(BlobUtils.normalizeBlobId(foundTempBlobIds.get(i))); +} +} +} Review comment: Contract wise, `cleanupRecoverableState` is expected to clean-up for a resumable rather than a target file being written. Once a checkpoint is successfully
[jira] [Created] (FLINK-22486) Wrong results of the IN operator
Shengkai Fang created FLINK-22486: - Summary: Wrong results of the IN operator Key: FLINK-22486 URL: https://issues.apache.org/jira/browse/FLINK-22486 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.13.0 Reporter: Shengkai Fang Please add the following test in the {{CalcITCase}}. {code:java} @Test def testSimpleProject(): Unit = { val myTableDataId = TestValuesTableFactory.registerData(Seq(row("HC809"))) val ddl = s""" |CREATE TABLE SimpleTable ( | content String |) WITH ( | 'connector' = 'values', | 'data-id' = '$myTableDataId', | 'bounded' = 'true' |) |""".stripMargin tEnv.executeSql(ddl) val sql = """ |SELECT content from SimpleTable where content in ( |'CTNBSmokeSensor','H388N', |'H389N', |'GHL-IRD','JY-BF-20YN','HC809', |'DH-9908N-AEP','DH-9908N' |) | |""".stripMargin checkResult( sql, Seq(row("HC809")) ) } {code} It should return the result but nothing return -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong commented on a change in pull request #15768: [FLINK-22451][table] Support (*) as parameter of UDFs in Table API
wuchong commented on a change in pull request #15768: URL: https://github.com/apache/flink/pull/15768#discussion_r620820408 ## File path: docs/content/docs/dev/table/functions/udfs.md ## @@ -171,6 +171,52 @@ env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(tru {{< /tab >}} {{< /tabs >}} +You can use star (*) as one argument of the function call to act as a wildcard in Table API, +all columns in the table will be passed to the function at the corresponding position. + +{{< tabs "64dd4129-6313-4904-b7e7-a1a0535822e9" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.ScalarFunction; +import static org.apache.flink.table.api.Expressions.*; + +public static class JoinFunction extends ScalarFunction { + + public String eval(String f0, String f1) { +return f0 + f1; + } +} + +TableEnvironment env = TableEnvironment.create(...); + +// call function with $("*"), if MyTable has two string fields, +// all of them will be passed to JoinFunction. +env.from("MyTable").select(call(JoinFunction.class, $("*"))); Review comment: I would suggest to show a more meaningful example here, e.g. `concat`? ```java public static class MyConcatFunction extends ScalarFunction { public String eval(Object... fields) { return Arrays.stream(fields) .map(Object::toString) .collect(Collectors.joining(",")); } } TableEnvironment env = TableEnvironment.create(...); // call function with $("*"), if MyTable has 3 fields (a, b, c), // all of them will be passed to JoinFunction. env.from("MyTable").select(call(MyConcatFunction.class, $("*"))); // it's equal to call function with explicitly selecting all columns env.from("MyTable").select(call(MyConcatFunction.class, $("a"), $("b"), $("c"))); ``` ## File path: docs/content/docs/dev/table/functions/udfs.md ## @@ -171,6 +171,52 @@ env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(tru {{< /tab >}} {{< /tabs >}} +You can use star (*) as one argument of the function call to act as a wildcard in Table API, +all columns in the table will be passed to the function at the corresponding position. + +{{< tabs "64dd4129-6313-4904-b7e7-a1a0535822e9" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.ScalarFunction; +import static org.apache.flink.table.api.Expressions.*; + +public static class JoinFunction extends ScalarFunction { + + public String eval(String f0, String f1) { +return f0 + f1; + } +} + +TableEnvironment env = TableEnvironment.create(...); + +// call function with $("*"), if MyTable has two string fields, +// all of them will be passed to JoinFunction. +env.from("MyTable").select(call(JoinFunction.class, $("*"))); Review comment: And please also add an IT case for this to make sure this is supported end-to-end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22485) Support client attach on application mode
[ https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332900#comment-17332900 ] Junfan Zhang commented on FLINK-22485: -- Any ideas on it? [~jark] [~lirui] > Support client attach on application mode > - > > Key: FLINK-22485 > URL: https://issues.apache.org/jira/browse/FLINK-22485 > Project: Flink > Issue Type: Improvement >Reporter: Junfan Zhang >Priority: Major > > Now, client will not wait until job finish when using application mode. > Can we support client attach on application mode? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22485) Support client attach on application mode
Junfan Zhang created FLINK-22485: Summary: Support client attach on application mode Key: FLINK-22485 URL: https://issues.apache.org/jira/browse/FLINK-22485 Project: Flink Issue Type: Improvement Reporter: Junfan Zhang Now, client will not wait until job finish when using application mode. Can we support client attach on application mode? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
xintongsong commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r620824056 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.BlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; + +/** The recoverable writer implementation for Google storage. */ +public class GSRecoverableWriter implements RecoverableWriter { + +/** The underlying blob storage. */ +private final BlobStorage storage; + +/** The GS file system options. */ +private final GSFileSystemOptions options; + +/** + * Construct a GS recoverable writer. + * + * @param storage The underlying blob storage instance + * @param options The GS file system options + */ +public GSRecoverableWriter(BlobStorage storage, GSFileSystemOptions options) { +this.storage = Preconditions.checkNotNull(storage); +this.options = Preconditions.checkNotNull(options); +} + +@Override +public boolean requiresCleanupOfRecoverableState() { +return true; +} + +@Override +public boolean supportsResume() { +return true; +} + +@Override +public RecoverableFsDataOutputStream open(Path path) throws IOException { +Preconditions.checkNotNull(path); + +BlobId finalBlobId = BlobUtils.parseUri(path.toUri()); +GSRecoverableWriterState state = new GSRecoverableWriterState(finalBlobId); +return new GSRecoverableFsDataOutputStream(storage, options, this, state); +} + +@Override +public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException { +Preconditions.checkNotNull(resumable); + +GSRecoverableWriterState state = (GSRecoverableWriterState) resumable; +return new GSRecoverableFsDataOutputStream(storage, options, this, state); +} + +@Override +public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException { +Preconditions.checkNotNull(resumable); + +// determine the partial name for the temporary objects to be deleted +GSRecoverableWriterState state = (GSRecoverableWriterState) resumable; +String temporaryBucketName = state.getTemporaryBucketName(options); +String temporaryObjectPartialName = state.getTemporaryObjectPartialName(options); + +// this will hold the set of blob ids that were actually deleted +HashSet deletedBlobIds = new HashSet<>(); + +// find all the temp blobs by looking for anything that starts with the temporary +// object partial name. doing it this way finds any orphaned temp blobs that might +// have come about when resuming +List foundTempBlobIds = +storage.list(temporaryBucketName, temporaryObjectPartialName); +if (!foundTempBlobIds.isEmpty()) { + +// delete all the temp blobs, and populate the set with ones that were actually deleted +// normalize in case the blob came back with a generation populated +List deleteResults = storage.delete(foundTempBlobIds); +for (int i = 0; i < deleteResults.size(); i++) { +if (deleteResults.get(i)) { + deletedBlobIds.add(BlobUtils.normalizeBlobId(foundTempBlobIds.get(i))); +} +} +} Review comment: Contract wise, `cleanupRecoverableState` is expected to clean-up for a resumable rather than a target file being written. Once a checkpoint is successfully
[GitHub] [flink] flinkbot edited a comment on pull request #15131: [FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn
flinkbot edited a comment on pull request #15131: URL: https://github.com/apache/flink/pull/15131#issuecomment-794957907 ## CI report: * 0500d84f78d8fdb967b077fb024fda0cd4ba2d6a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15682) * b99e1a12c74051454d211b0094c986b8741dd28f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample
flinkbot edited a comment on pull request #15121: URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240 ## CI report: * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN * 76bbb8de288e2990974fe9ea7a09fb47e37daad8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17220) * fbd38b53197c244af5ed3af54cebdee75b393a1d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17257) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wydhcws commented on a change in pull request #15742: [FLINK-22442][CEP] Fix Using scala api to change the TimeCharacteristic o…
wydhcws commented on a change in pull request #15742: URL: https://github.com/apache/flink/pull/15742#discussion_r620818295 ## File path: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala ## @@ -0,0 +1,67 @@ +package org.apache.flink.cep.scala + +import java.lang.reflect.Field + +import org.apache.flink.cep +import org.apache.flink.cep.pattern.Pattern +import org.apache.flink.cep.pattern.conditions.SimpleCondition +import org.apache.flink.streaming.api.datastream.DataStreamSource +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.junit.Assert.assertEquals +import org.junit.Test + +class CEPScalaApiPatternStreamTest { + /** +* These tests simply check that use the Scala API to update the TimeCharacteristic of the PatternStream . +*/ + + @Test + def updateCepTimeCharacteristicByScalaApi(): Unit = { + +val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment +val input: DataStreamSource[Event] = env.fromElements(Event(1, "barfoo", 1.0), Event(8, "end", 1.0)) +val pattern: Pattern[Event, Event] = Pattern.begin("start").where(new SimpleCondition[Event]() { + override def filter(value: Event): Boolean = value.name == "start" +}) + +val jestream: cep.PatternStream[Event] = org.apache.flink.cep.CEP.pattern(input, pattern) + +//get org.apache.flink.cep.scala.PatternStream +val sePstream = new PatternStream[Event](jestream) + +//get TimeBehaviour +val time1: AnyRef = getTimeBehaviourFromScalaPatternStream(sePstream) + +assertEquals(time1.toString, "EventTime") + +//change TimeCharacteristic use scala api +val sPstream: PatternStream[Event] = sePstream.inProcessingTime() + +//get TimeBehaviour +val time2: AnyRef = getTimeBehaviourFromScalaPatternStream(sPstream) + +assertEquals(time2.toString, "ProcessingTime") + + + } + + def getTimeBehaviourFromScalaPatternStream(seStream: org.apache.flink.cep.scala.PatternStream[Event]) = { +val field: Field = seStream.getClass.getDeclaredField("jPatternStream") +field.setAccessible(true) +val JPattern: AnyRef = field.get(seStream) +val stream: cep.PatternStream[Event] = JPattern.asInstanceOf[cep.PatternStream[Event]] +getTimeBehaviourFromJavaPatternStream(stream) + } + + def getTimeBehaviourFromJavaPatternStream(jeStream: org.apache.flink.cep.PatternStream[Event])={ +val builder: Field = jeStream.getClass.getDeclaredField("builder") +builder.setAccessible(true) +val o: AnyRef = builder.get(jeStream) +val timeBehaviour: Field = o.getClass.getDeclaredField("timeBehaviour") +timeBehaviour.setAccessible(true) Review comment: sir,thank you for your suggestion. Actually, I also think this is not good, but I am a little confused. How to prove that a private variable is a change without reflection. Should I write a case to prove that the matching result changes after switching the timeCharacteristic of the PatternStream instead of proving that the private timeBehaviour changes ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zuston commented on pull request #15131: [FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn
zuston commented on pull request #15131: URL: https://github.com/apache/flink/pull/15131#issuecomment-827278120 @XComp Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
xintongsong commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r620815613 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.storage.BlobId; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** The state of a recoverable write. */ +class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, Cloneable { + +/** The blob id to which the recoverable write operation is writing. */ +public final BlobId finalBlobId; + +/** The number of bytes that have been written so far. */ +public long bytesWritten; + +/** Indicates if the write has been closed. */ +public boolean closed; + +/** The object ids for the temporary objects that should be composed to form the final blob. */ +public final List componentObjectIds; + +GSRecoverableWriterState( +BlobId finalBlobId, long bytesWritten, boolean closed, List componentObjectIds) { +this.finalBlobId = Preconditions.checkNotNull(finalBlobId); +Preconditions.checkArgument(bytesWritten >= 0); +this.bytesWritten = bytesWritten; +this.closed = closed; + +// shallow copy the component object ids to ensure this state object exclusively +// manages the list of component object ids +this.componentObjectIds = new ArrayList<>(Preconditions.checkNotNull(componentObjectIds)); +} + +GSRecoverableWriterState(GSRecoverableWriterState state) { +this(state.finalBlobId, state.bytesWritten, state.closed, state.componentObjectIds); +} + +GSRecoverableWriterState(BlobId finalBlobId) { +this(finalBlobId, 0, false, new ArrayList<>()); +} + +/** + * Returns the temporary bucket name. If options specifies a temporary bucket name, we use that + * one; otherwise, we use the bucket name of the final blob. + * + * @param options The GS file system options + * @return The temporary bucket name + */ +String getTemporaryBucketName(GSFileSystemOptions options) { +return options.writerTemporaryBucketName.isEmpty() +? finalBlobId.getBucket() +: options.writerTemporaryBucketName; +} Review comment: I see. Thanks for the explanation. Then maybe also make this one a util method, that takes `finalBlobId` as an argument? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15763: [FLINK-22364][doc] Translate the page of "Data Sources" to Chinese.
flinkbot edited a comment on pull request #15763: URL: https://github.com/apache/flink/pull/15763#issuecomment-826589720 ## CI report: * 03b1c6bc999ee92a23eb0a0b84a368f2643e841d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17201) * 0dcb773365ec16118d2b4c0f55910441bc6d7d58 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15745: [FLINK-22304][table] Refactor some interfaces for TVF based window to…
flinkbot edited a comment on pull request #15745: URL: https://github.com/apache/flink/pull/15745#issuecomment-826249839 ## CI report: * c9df5fedbcb6ce93f3a7576cf35753af3ffab6b4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17223) * 11130e794b07aa9215035e5c204766295a4f50fe Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17258) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample
flinkbot edited a comment on pull request #15121: URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240 ## CI report: * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN * 76bbb8de288e2990974fe9ea7a09fb47e37daad8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17220) * fbd38b53197c244af5ed3af54cebdee75b393a1d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support
xintongsong commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r620812109 ## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + +private static final String SCHEME = "gs"; + +private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + +private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + +private static final String[][] MIRRORED_CONFIG_KEYS = {}; + +private static final String FLINK_SHADING_PREFIX = ""; + +public static final ConfigOption WRITER_TEMPORARY_BUCKET_NAME = +ConfigOptions.key("gs.writer.temporary.bucket.name") +.stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME) +.withDescription( +"This option sets the bucket name used by the recoverable writer to store temporary files. " ++ "If empty, temporary files are stored in the same bucket as the final file being written."); + +public static final ConfigOption WRITER_TEMPORARY_OBJECT_PREFIX = +ConfigOptions.key("gs.writer.temporary.object.prefix") +.stringType() + .defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX) +.withDescription( +"This option sets the prefix used by the recoverable writer when writing temporary files. This prefix is applied to the " ++ "final object name to form the base name for temporary files."); Review comment: I'd be in favor of not introducing this option if it's not absolutely necessary. The more options we provide to the users, the more constraints we put on future developing and maintaining. As for including the bucket names in temporary blob names, I think that's a good idea. Since the temporary blobs are not meant to be manipulated by users directly, it would be nice to carry more information with the blob names. Moreover, I wonder whether we should also include the index of temporary blobs, in addition to the UUID. That might be useful for cleaning up the resumables. I'll explain in the other comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22451) Support (*) as parameter of table UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-22451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332850#comment-17332850 ] Jark Wu commented on FLINK-22451: - {{ withColumns}} and {{withoutColumns}} are also shortcuts similar to {{$("*")}} for easier selecting columns. > Support (*) as parameter of table UserDefinedFunction > -- > > Key: FLINK-22451 > URL: https://issues.apache.org/jira/browse/FLINK-22451 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Yi Tang >Assignee: Yi Tang >Priority: Minor > Labels: pull-request-available > > For now, one can use star \(*) to act as a wild card, selecting all of the > columns in the table. > {code:java} > Table result = orders.select($("*")); > {code} > When one use a star \(*) as parameter of an UDF, it will fail > {{ReferenceResolverRule}} in on > {code:java} > "Cannot resolve field [*], input field list:[...]." > {code} > The cause is that, the parameter of an UDF is not expanded in > {{StarReferenceFlatteningRule}} > I think we can support to expand the star parameter to the real fields list > if it is the only parameter(the last parameter is also ok) of the UDF. > then the parameters can be received by > {code:java} > eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object... row) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15745: [FLINK-22304][table] Refactor some interfaces for TVF based window to…
flinkbot edited a comment on pull request #15745: URL: https://github.com/apache/flink/pull/15745#issuecomment-826249839 ## CI report: * c9df5fedbcb6ce93f3a7576cf35753af3ffab6b4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17223) * 11130e794b07aa9215035e5c204766295a4f50fe UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tisonkun merged pull request #15756: [hotfix][docs] Removed duplicate 'is'
tisonkun merged pull request #15756: URL: https://github.com/apache/flink/pull/15756 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-22470) The root cause of the exception encountered during compiling the job was not exposed to users in certain cases
[ https://issues.apache.org/jira/browse/FLINK-22470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-22470. --- Resolution: Fixed Merged to - master via bef98cf0ba168dd4f0b2c8614d2179cec8b54ff8 - release-1.13 via dfa6623f3d4a1fead2be88aa964ed69a1219d382 - release-1.12 via fbd38b53197c244af5ed3af54cebdee75b393a1d > The root cause of the exception encountered during compiling the job was not > exposed to users in certain cases > -- > > Key: FLINK-22470 > URL: https://issues.apache.org/jira/browse/FLINK-22470 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0, 1.12.4 > > > For the following job: > {code} > def test(): > from pyflink.table import DataTypes, BatchTableEnvironment, > EnvironmentSettings > env_settings = > EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() > table_env = > BatchTableEnvironment.create(environment_settings=env_settings) > table_env \ > .get_config() \ > .get_configuration() \ > .set_string( > "pipeline.jars", > > "file:///Users/dianfu/code/src/alibaba/ververica-connectors/flink-sql-avro-1.12.0.jar" > ) > table = table_env.from_elements( > [('111', '222')], > schema=DataTypes.ROW([ > DataTypes.FIELD('text', DataTypes.STRING()), > DataTypes.FIELD('text1', DataTypes.STRING()) > ]) > ) > sink_ddl = f""" > create table Results( > a STRING, > b STRING > ) with ( > 'connector' = 'filesystem', > 'path' = '/Users/dianfu/tmp/', > 'format' = 'avro' > ) > """ > table_env.execute_sql(sink_ddl) > table.execute_insert("Results").wait() > if __name__ == "__main__": > test() > {code} > It throws the following exception: > {code} > pyflink.util.exceptions.TableException: Failed to execute sql >at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) >at > org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572) >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >at java.lang.reflect.Method.invoke(Method.java:498) >at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >at java.lang.Thread.run(Thread.java:748) > Process finished with exit code 1 > {code} > The root cause isn't exposed and it's difficult for users to figure out what > happens. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu closed pull request #15765: [FLINK-22470][python] Make sure that the root cause of the exception encountered during compiling the job was exposed to users in all cases
dianfu closed pull request #15765: URL: https://github.com/apache/flink/pull/15765 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu commented on pull request #15765: [FLINK-22470][python] Make sure that the root cause of the exception encountered during compiling the job was exposed to users in all cases
dianfu commented on pull request #15765: URL: https://github.com/apache/flink/pull/15765#issuecomment-827266624 Closed via fbd38b53197c244af5ed3af54cebdee75b393a1d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu closed pull request #15766: [FLINK-22470][python] Make sure that the root cause of the exception encountered during compiling the job was exposed to users in all cases
dianfu closed pull request #15766: URL: https://github.com/apache/flink/pull/15766 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wydhcws commented on a change in pull request #15742: [FLINK-22442][CEP] Fix Using scala api to change the TimeCharacteristic o…
wydhcws commented on a change in pull request #15742: URL: https://github.com/apache/flink/pull/15742#discussion_r620795286 ## File path: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ## @@ -447,17 +447,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { } def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = { - jPatternStream.sideOutputLateData(lateDataOutputTag) + jPatternStream = jPatternStream.sideOutputLateData(lateDataOutputTag) Review comment: Thank you for your valuable suggestions. I will try to rewrite it in a more elegant way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-22074) testRequirementCheckOnlyTriggeredOnce(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest) failed
[ https://issues.apache.org/jira/browse/FLINK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-22074. Resolution: Fixed Fixed via * master (1.14): 6236473f8fb90cbde9673d1fa0c51659bc9e0c8c * release-1.13: ac90590061bdb21bac56247d998aeda1ba6c5e2c > testRequirementCheckOnlyTriggeredOnce(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest) > failed > - > > Key: FLINK-22074 > URL: https://issues.apache.org/jira/browse/FLINK-22074 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.13.0 >Reporter: Leonard Xu >Assignee: Yangze Guo >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.13.0 > > > [ERROR] Tests run: 14, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.654 s <<< FAILURE! - in > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest > > [ERROR] > testRequirementCheckOnlyTriggeredOnce(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest) > Time elapsed: 0.059 s <<< FAILURE! > java.lang.AssertionError: Expected to fail with a timeout. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureNotComplete(FineGrainedSlotManagerTestBase.java:126) > > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest$10.lambda$new$3(FineGrainedSlotManagerTest.java:605) > > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:197) > > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest$10.(FineGrainedSlotManagerTest.java:581) > > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.testRequirementCheckOnlyTriggeredOnce(FineGrainedSlotManagerTest.java:565) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) > > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) > > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) > > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) > > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) > > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) > > at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) > at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong closed pull request #15751: [FLINK-22074][test] Harden FineGrainedSlotManagerTest#testRequirement…
xintongsong closed pull request #15751: URL: https://github.com/apache/flink/pull/15751 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-17726) Scheduler should take care of tasks directly canceled by TaskManager
[ https://issues.apache.org/jira/browse/FLINK-17726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332834#comment-17332834 ] Zhu Zhu edited comment on FLINK-17726 at 4/27/21, 1:50 AM: --- I think it is a potential issue and is not a real production problem yet. The problem would happen only if a task is directly cancelled by TM and no other task in the same pipelined region was failed. So far I think this case will not happen. was (Author: zhuzh): I think it is a potential issue and is not a real production problem yet. The problem would happen only if a task is directly cancelled by TM without failing nay other task in the same pipelined region. So far I think this case will not happen. > Scheduler should take care of tasks directly canceled by TaskManager > > > Key: FLINK-17726 > URL: https://issues.apache.org/jira/browse/FLINK-17726 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / Task >Affects Versions: 1.11.0, 1.12.0 >Reporter: Zhu Zhu >Priority: Critical > Labels: stale-critical > > JobManager will not trigger failure handling when receiving CANCELED task > update. > This is because CANCELED tasks are usually caused by another FAILED task. > These CANCELED tasks will be restarted by the failover process triggered > FAILED task. > However, if a task is directly CANCELED by TaskManager due to its own runtime > issue, the task will not be recovered by JM and thus the job would hang. > This is a potential issue and we should avoid it. > A possible solution is to let JobManager treat tasks transitioning to > CANCELED from all states except from CANCELING as failed tasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17726) Scheduler should take care of tasks directly canceled by TaskManager
[ https://issues.apache.org/jira/browse/FLINK-17726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332834#comment-17332834 ] Zhu Zhu commented on FLINK-17726: - I think it is a potential issue and is not a real production problem yet. The problem would happen only if a task is directly cancelled by TM without failing nay other task in the same pipelined region. So far I think this case will not happen. > Scheduler should take care of tasks directly canceled by TaskManager > > > Key: FLINK-17726 > URL: https://issues.apache.org/jira/browse/FLINK-17726 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / Task >Affects Versions: 1.11.0, 1.12.0 >Reporter: Zhu Zhu >Priority: Critical > Labels: stale-critical > > JobManager will not trigger failure handling when receiving CANCELED task > update. > This is because CANCELED tasks are usually caused by another FAILED task. > These CANCELED tasks will be restarted by the failover process triggered > FAILED task. > However, if a task is directly CANCELED by TaskManager due to its own runtime > issue, the task will not be recovered by JM and thus the job would hang. > This is a potential issue and we should avoid it. > A possible solution is to let JobManager treat tasks transitioning to > CANCELED from all states except from CANCELING as failed tasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xintongsong closed pull request #15748: [hotfix][coordination] Add log for slot allocation in FineGrainedSlot…
xintongsong closed pull request #15748: URL: https://github.com/apache/flink/pull/15748 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #15760: [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder.
JingsongLi commented on a change in pull request #15760: URL: https://github.com/apache/flink/pull/15760#discussion_r620782487 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.RowDataUtil; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.window.state.WindowListState; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; + +import java.util.IdentityHashMap; +import java.util.List; + +/** + * Streaming window join operator. + * + * Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus + * late elements (elements belong to emitted windows) will be simply dropped. + */ +public abstract class WindowJoinOperator extends TableStreamOperator +implements TwoInputStreamOperator, +Triggerable, +KeyContext { + +private static final long serialVersionUID = 1L; + +private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = +"leftNumLateRecordsDropped"; +private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = +"leftLateRecordsDroppedRate"; +private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = +"rightNumLateRecordsDropped"; +private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = +"rightLateRecordsDroppedRate"; +private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; +private static final String LEFT_RECORDS_STATE_NAME = "left-records"; +private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; + +protected final InternalTypeInfo leftType; +protected final InternalTypeInfo rightType; +private final GeneratedJoinCondition generatedJoinCondition; + +private final int leftWindowEndIndex; +private final int rightWindowEndIndex; + +private final boolean[] filterNullKeys; + +/** Flag to prevent duplicate function.close() calls in close() and dispose(). */ +private transient boolean functionsClosed = false; + +private transient InternalTimerService internalTimerService; + +// +protected transient JoinConditionWithNullFilters joinCondition; + +/** This is used for emitting elements with a given timestamp. */ +protected transient TimestampedCollector collector; + +private transient WindowListState leftWindowState; +private transient WindowListState rightWindowState; + +//
[GitHub] [flink] JingsongLi commented on a change in pull request #15760: [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder.
JingsongLi commented on a change in pull request #15760: URL: https://github.com/apache/flink/pull/15760#discussion_r620781908 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.RowDataUtil; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.window.state.WindowListState; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; + +import java.util.IdentityHashMap; +import java.util.List; + +/** + * Streaming window join operator. + * + * Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus + * late elements (elements belong to emitted windows) will be simply dropped. + */ +public abstract class WindowJoinOperator extends TableStreamOperator +implements TwoInputStreamOperator, +Triggerable, +KeyContext { + +private static final long serialVersionUID = 1L; + +private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = +"leftNumLateRecordsDropped"; +private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = +"leftLateRecordsDroppedRate"; +private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = +"rightNumLateRecordsDropped"; +private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = +"rightLateRecordsDroppedRate"; +private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; +private static final String LEFT_RECORDS_STATE_NAME = "left-records"; +private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; + +protected final InternalTypeInfo leftType; +protected final InternalTypeInfo rightType; +private final GeneratedJoinCondition generatedJoinCondition; + +private final int leftWindowEndIndex; +private final int rightWindowEndIndex; + +private final boolean[] filterNullKeys; + +/** Flag to prevent duplicate function.close() calls in close() and dispose(). */ +private transient boolean functionsClosed = false; + +private transient InternalTimerService internalTimerService; + +// +protected transient JoinConditionWithNullFilters joinCondition; + +/** This is used for emitting elements with a given timestamp. */ +protected transient TimestampedCollector collector; + +private transient WindowListState leftWindowState; +private transient WindowListState rightWindowState; + +//
[GitHub] [flink] JingsongLi commented on a change in pull request #15760: [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder.
JingsongLi commented on a change in pull request #15760: URL: https://github.com/apache/flink/pull/15760#discussion_r620781485 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.RowDataUtil; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.window.state.WindowListState; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; + +import java.util.IdentityHashMap; +import java.util.List; + +/** + * Streaming window join operator. + * + * Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus + * late elements (elements belong to emitted windows) will be simply dropped. + */ +public abstract class WindowJoinOperator extends TableStreamOperator +implements TwoInputStreamOperator, +Triggerable, +KeyContext { + +private static final long serialVersionUID = 1L; + +private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = +"leftNumLateRecordsDropped"; +private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = +"leftLateRecordsDroppedRate"; +private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = +"rightNumLateRecordsDropped"; +private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = +"rightLateRecordsDroppedRate"; +private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; +private static final String LEFT_RECORDS_STATE_NAME = "left-records"; +private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; + +protected final InternalTypeInfo leftType; +protected final InternalTypeInfo rightType; +private final GeneratedJoinCondition generatedJoinCondition; + +private final int leftWindowEndIndex; +private final int rightWindowEndIndex; + +private final boolean[] filterNullKeys; + +/** Flag to prevent duplicate function.close() calls in close() and dispose(). */ +private transient boolean functionsClosed = false; + +private transient InternalTimerService internalTimerService; + +// +protected transient JoinConditionWithNullFilters joinCondition; + +/** This is used for emitting elements with a given timestamp. */ +protected transient TimestampedCollector collector; + +private transient WindowListState leftWindowState; +private transient WindowListState rightWindowState; + +//
[GitHub] [flink] JingsongLi merged pull request #15752: [FLINK-19606][table-runtime-blink] Refactor JoinConditionWithFullFilters from an inner class in AbstractStreamingJoinOperator to a common utilit
JingsongLi merged pull request #15752: URL: https://github.com/apache/flink/pull/15752 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15774: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)
flinkbot edited a comment on pull request #15774: URL: https://github.com/apache/flink/pull/15774#issuecomment-827133461 ## CI report: * 5fbd84949babc1e127bc8d6d951ed75dffda4151 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17248) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15773: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)
flinkbot edited a comment on pull request #15773: URL: https://github.com/apache/flink/pull/15773#issuecomment-827133382 ## CI report: * ece7df21556a92a2711f39cc46d41aa094392eb7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17247) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-21938) Add documentation about how to test Python UDFs
[ https://issues.apache.org/jira/browse/FLINK-21938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-21938: --- Labels: auto-unassigned pull-request-available (was: pull-request-available stale-assigned) > Add documentation about how to test Python UDFs > --- > > Key: FLINK-21938 > URL: https://issues.apache.org/jira/browse/FLINK-21938 > Project: Flink > Issue Type: Improvement > Components: API / Python, Documentation >Reporter: Dian Fu >Assignee: Yik San Chan >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.13.0 > > > It should be similar to the Java UDFs: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-user-defined-functions -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21938) Add documentation about how to test Python UDFs
[ https://issues.apache.org/jira/browse/FLINK-21938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332797#comment-17332797 ] Flink Jira Bot commented on FLINK-21938: This issue was marked "stale-assigned" and has not received an update in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work. > Add documentation about how to test Python UDFs > --- > > Key: FLINK-21938 > URL: https://issues.apache.org/jira/browse/FLINK-21938 > Project: Flink > Issue Type: Improvement > Components: API / Python, Documentation >Reporter: Dian Fu >Assignee: Yik San Chan >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.13.0 > > > It should be similar to the Java UDFs: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-user-defined-functions -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] mans2singh commented on pull request #15756: [hotfix][docs] Removed duplicate 'is'
mans2singh commented on pull request #15756: URL: https://github.com/apache/flink/pull/15756#issuecomment-827188168 Hello @wuchong @leonardBang - Can you please review this PR ? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15772: [FLINK-21131][webui] Alignment timeout displayed in checkpoint config…
flinkbot edited a comment on pull request #15772: URL: https://github.com/apache/flink/pull/15772#issuecomment-827092251 ## CI report: * fbafed45312a6d55e2a65e666833c115c0a0f07a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17242) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15774: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)
flinkbot edited a comment on pull request #15774: URL: https://github.com/apache/flink/pull/15774#issuecomment-827133461 ## CI report: * 5fbd84949babc1e127bc8d6d951ed75dffda4151 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17248) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15773: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)
flinkbot edited a comment on pull request #15773: URL: https://github.com/apache/flink/pull/15773#issuecomment-827133382 ## CI report: * ece7df21556a92a2711f39cc46d41aa094392eb7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17247) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15774: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)
flinkbot commented on pull request #15774: URL: https://github.com/apache/flink/pull/15774#issuecomment-827133461 ## CI report: * 5fbd84949babc1e127bc8d6d951ed75dffda4151 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15773: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)
flinkbot commented on pull request #15773: URL: https://github.com/apache/flink/pull/15773#issuecomment-827133382 ## CI report: * ece7df21556a92a2711f39cc46d41aa094392eb7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22441) In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There are many vulnerabilities, like CVE-2021-21409 etc. please confirm these version and fix.
[ https://issues.apache.org/jira/browse/FLINK-22441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332718#comment-17332718 ] Austin Cawley-Edwards commented on FLINK-22441: --- Adding the "drop Scala 2.11 support" ticket as a blocker, as Konstantin mentioned the current Akka lib is the root of this issue, and Akka cannot be upgraded until Scala 2.11 is dropped. If there's a more accurate ticket for the Akka upgrade, we can update the blocker. > In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There > are many vulnerabilities, like CVE-2021-21409 etc. please confirm these > version and fix. thx > -- > > Key: FLINK-22441 > URL: https://issues.apache.org/jira/browse/FLINK-22441 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.11.3, 1.12.2, 1.13.0 >Reporter: 张健 >Priority: Major > > In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There > are many vulnerabilities, like CVE-2021-21409 CVE-2021-21295 etc. please > confirm these version and fix. thx -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #15774: [FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)
flinkbot commented on pull request #15774: URL: https://github.com/apache/flink/pull/15774#issuecomment-827123991 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 5fbd84949babc1e127bc8d6d951ed75dffda4151 (Mon Apr 26 20:24:55 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15773: [FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)
flinkbot commented on pull request #15773: URL: https://github.com/apache/flink/pull/15773#issuecomment-827122947 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit ece7df21556a92a2711f39cc46d41aa094392eb7 (Mon Apr 26 20:22:58 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22479) [Kinesis][Consumer] Potential lock-up under error condition
[ https://issues.apache.org/jira/browse/FLINK-22479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-22479: --- Labels: pull-request-available (was: ) > [Kinesis][Consumer] Potential lock-up under error condition > --- > > Key: FLINK-22479 > URL: https://issues.apache.org/jira/browse/FLINK-22479 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.12.0, 1.12.1, 1.12.2 >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Critical > Labels: pull-request-available > Fix For: 1.13.0, 1.14.0, 1.12.3 > > > *Background* > This connector has been > [forked|https://github.com/awslabs/amazon-kinesis-connector-flink] by AWS for > use on KDA with Flink 1.11. Bugs have been encountered: > - Under high backpressure scenarios > - When an error is thrown during tear down > *Scope* > Pull in the following fixes from AWS fork: > * Fix issue where {{KinesisDataFetcher.shutdownFetcher()}} hangs > ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/23], > [pull > request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/24]) > > * Log error when shutting down Kinesis Data Fetcher > ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/22], > [pull > request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/25]) > > * Treating TimeoutException as Recoverable Exception > ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/28], > [pull > request|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/21]) > > * Add time-out for acquiring subscription and passing events from network to > source thread to prevent deadlock ([pull > request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/18]) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dannycranmer opened a new pull request #15774: [FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)
dannycranmer opened a new pull request #15774: URL: https://github.com/apache/flink/pull/15774 ## What is the purpose of the change Pull in the following fixes from AWS fork: * Fix issue where `KinesisDataFetcher.shutdownFetcher()` hangs ([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/23), [pull request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/24)) * Log error when shutting down Kinesis Data Fetcher ([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/22), [pull request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/25)) * Treating TimeoutException as Recoverable Exception ([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/28), [pull request](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/21)) * Add time-out for acquiring subscription and passing events from network to source thread to prevent deadlock ([pull request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/18)) ## Brief change log - Added `Timeout` when blocking on queue between source and network thread - Catching exceptions during source teardown, to allow source to interrupt threads of executor ## Verifying this change - Added unit tests for changes - Verified on AWS KDA with test applications ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dannycranmer opened a new pull request #15773: [FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)
dannycranmer opened a new pull request #15773: URL: https://github.com/apache/flink/pull/15773 ## What is the purpose of the change Pull in the following fixes from AWS fork: * Fix issue where `KinesisDataFetcher.shutdownFetcher()` hangs ([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/23), [pull request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/24)) * Log error when shutting down Kinesis Data Fetcher ([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/22), [pull request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/25)) * Treating TimeoutException as Recoverable Exception ([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/28), [pull request](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/21)) * Add time-out for acquiring subscription and passing events from network to source thread to prevent deadlock ([pull request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/18)) ## Brief change log - Added `Timeout` when blocking on queue between source and network thread - Catching exceptions during source teardown, to allow source to interrupt threads of executor ## Verifying this change - Added unit tests for changes - Verified on AWS KDA with test applications ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org