[GitHub] [flink] flinkbot commented on pull request #13814: [FLINK-19839][e2e] Properly forward test exit code to CI system
flinkbot commented on pull request #13814: URL: https://github.com/apache/flink/pull/13814#issuecomment-717715530 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 4ff8c35475568fee2cd77ba6268e119ed805ec95 (Wed Oct 28 05:53:13 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kl0u commented on pull request #13808: [FLINK-19834] Make the TestSink reusable in all the sink related tests.
kl0u commented on pull request #13808: URL: https://github.com/apache/flink/pull/13808#issuecomment-717715756 Yes @guoweiM , I think it is better to introduce a change with the commit that uses it. It is nice for commits to be self-contained so that if something goes wrong, we can easily find when and why a change was introduced. Splitting the introduction of a change from its use or its tests makes it more difficult to trace back why it was introduced and what are its implications :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19839) e2e test failures are not causing the build to fail
[ https://issues.apache.org/jira/browse/FLINK-19839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19839: --- Labels: pull-request-available (was: ) > e2e test failures are not causing the build to fail > --- > > Key: FLINK-19839 > URL: https://issues.apache.org/jira/browse/FLINK-19839 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Blocker > Labels: pull-request-available > > Example: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8385=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 > {code} > Oct 27 13:37:05 Killing JM watchdog @ 11717 > Oct 27 13:37:05 Killing TM watchdog @ 11801 > Oct 27 13:37:05 [FAIL] Test script contains errors. > Oct 27 13:37:05 Checking of logs skipped. > Oct 27 13:37:05 > Oct 27 13:37:05 [FAIL] 'Running HA (file, sync) end-to-end test' failed after > 15 minutes and 0 seconds! Test exited with exit code 1 > Oct 27 13:37:05 > 13:37:05 ##[group]Environment Information > Oct 27 13:37:06 Published e2e logs into debug logs artifact: > Oct 27 13:37:06 flink-vsts-client-fv-az678.log > Oct 27 13:37:06 flink-vsts-standalonesession-0-fv-az678.log > Oct 27 13:37:06 flink-vsts-standalonesession-0-fv-az678.out > Oct 27 13:37:06 flink-vsts-taskexecutor-0-fv-az678.log > Oct 27 13:37:06 flink-vsts-taskexecutor-0-fv-az678.out > Oct 27 13:37:06 flink-vsts-zookeeper-0-fv-az678.log > Oct 27 13:37:06 flink-vsts-zookeeper-0-fv-az678.out > Oct 27 13:37:06 Searching for .dump, .dumpstream and related files in > '/home/vsts/work/1/s' > Oct 27 13:37:12 COMPRESSING build artifacts. > {code} > Despite the FAIL, the stage itself is green. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] rmetzger opened a new pull request #13814: [FLINK-19839][e2e] Properly forward test exit code to CI system
rmetzger opened a new pull request #13814: URL: https://github.com/apache/flink/pull/13814 Due to a recent change, the exit code of e2e tests was not forwarded properly anymore. This CI build demonstrates that failures are forwarded properly: https://dev.azure.com/rmetzger/Flink/_build/results?buildId=8556=results This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13813: [FLINK-19491][avro] AvroSerializerSnapshot cannot handle large schema
flinkbot edited a comment on pull request #13813: URL: https://github.com/apache/flink/pull/13813#issuecomment-717706259 ## CI report: * 61014eba30b67fc21a965f876d9484dd3873e14d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8459) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13744: [FLINK-19766][table-runtime] Introduce File streaming compaction operators
flinkbot edited a comment on pull request #13744: URL: https://github.com/apache/flink/pull/13744#issuecomment-714369975 ## CI report: * 164c8b5b7750ed567813d70f160ef9f064d0a3b0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8458) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13458: FLINK-18851 [runtime] Add checkpoint type to checkpoint history entries in Web UI
flinkbot edited a comment on pull request #13458: URL: https://github.com/apache/flink/pull/13458#issuecomment-697041219 ## CI report: * 7311b0d12d19a645391ea0359a9aa6318806363b UNKNOWN * e18067714e5c3f75ed860f97f7c86ddedc719130 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8209) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8178) * 3676c7e7a4729c494e12e82f4df4f2617ef29b5d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] guoweiM commented on pull request #13808: [FLINK-19834] Make the TestSink reusable in all the sink related tests.
guoweiM commented on pull request #13808: URL: https://github.com/apache/flink/pull/13808#issuecomment-717708815 > I have some comments that have mainly to do with unused methods and base classes that can be made concrete. I have pushed my changes here (to make sure that I do not break anything :) ) https://github.com/kl0u/flink/tree/FLINK-19834. > > Let me know what you think @guoweiM Thanks @kl0u. I would look at it. :-) I think you are right that these unused methods could removed in this pr. We could add it back when we need it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13813: [FLINK-19491][avro] AvroSerializerSnapshot cannot handle large schema
flinkbot commented on pull request #13813: URL: https://github.com/apache/flink/pull/13813#issuecomment-717706259 ## CI report: * 61014eba30b67fc21a965f876d9484dd3873e14d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13812: [FLINK-19674][docs-zh] Translate "Docker" of "Clusters & Depolyment" page into Chinese
flinkbot edited a comment on pull request #13812: URL: https://github.com/apache/flink/pull/13812#issuecomment-717687160 ## CI report: * 4f66617c4540a4987a5852cf0168f7e3a552c91a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8457) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13811: [FLINK-19836] Serialize the committable by the user provided serializer during network shuffle
flinkbot edited a comment on pull request #13811: URL: https://github.com/apache/flink/pull/13811#issuecomment-717686306 ## CI report: * 1f87c59d4ccdac3e91b28aaefebde4cc0e479bdf Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8456) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13797: [FLINK-19465][runtime / statebackends] Add CheckpointStorage interface and wire through runtime
flinkbot edited a comment on pull request #13797: URL: https://github.com/apache/flink/pull/13797#issuecomment-716854695 ## CI report: * c06b3dcd4eb4045f66352c8b65c14a7b60163ecd UNKNOWN * 185e46fd046c5f39a87ac3583ad559b065a275e1 UNKNOWN * f9196a71051202e88294076b7a514f895fb9f443 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8451) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
flinkbot edited a comment on pull request #13763: URL: https://github.com/apache/flink/pull/13763#issuecomment-715195599 ## CI report: * a28669bca9c36dac74f89da177825d73bea0ece0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8408) * ba752274c9926115f65f5ecbef55b71b0b71cfa2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8440) * f004220668e20dcd9860026b69566868d473db33 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8455) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13744: [FLINK-19766][table-runtime] Introduce File streaming compaction operators
flinkbot edited a comment on pull request #13744: URL: https://github.com/apache/flink/pull/13744#issuecomment-714369975 ## CI report: * c8f3503c99ec4add43df3d054783f7a48501324c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8454) * 164c8b5b7750ed567813d70f160ef9f064d0a3b0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gm7y8 commented on a change in pull request #13458: FLINK-18851 [runtime] Add checkpoint type to checkpoint history entries in Web UI
gm7y8 commented on a change in pull request #13458: URL: https://github.com/apache/flink/pull/13458#discussion_r513184682 ## File path: flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts ## @@ -80,5 +97,9 @@ export class JobCheckpointsDetailComponent implements OnInit { this.cdr.markForCheck(); this.refresh(); }); +this.jobService.loadCheckpointConfig(this.jobDetail.jid).subscribe(config => { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] klion26 commented on a change in pull request #13750: [FLINK-19394][docs-zh] Translate the 'Monitoring Checkpointing' page of 'Debugging & Monitoring' into Chinese
klion26 commented on a change in pull request #13750: URL: https://github.com/apache/flink/pull/13750#discussion_r513181496 ## File path: docs/monitoring/checkpoint_monitoring.zh.md ## @@ -25,105 +25,125 @@ under the License. * ToC {:toc} -## Overview + -Flink's web interface provides a tab to monitor the checkpoints of jobs. These stats are also available after the job has terminated. There are four different tabs to display information about your checkpoints: Overview, History, Summary, and Configuration. The following sections will cover all of these in turn. +## 概览(Overview) -## Monitoring +Flink 的 Web 界面提供了`选项卡/标签(tab)`来监视作业的 Checkpoint 信息。作业终止后,这些统计信息仍然可用。有四个不同的选项卡可显示有关 Checkpoint 的信息:概览(Overview),历史记录(History),摘要信息(Summary)和配置信息(Configuration)。以下各节将依次介绍这些内容。 -### Overview Tab + -The overview tabs lists the following statistics. Note that these statistics don't survive a JobManager loss and are reset to if your JobManager fails over. +## 监控(Monitoring) + + + +### 概览(Overview)选项卡 + +概览选项卡列出了以下统计信息。请注意,这些统计信息在 JobManager 丢失时无法保存,如果 JobManager 发生故障转移,这些统计信息将重置。 - **Checkpoint Counts** - - Triggered: The total number of checkpoints that have been triggered since the job started. - - In Progress: The current number of checkpoints that are in progress. - - Completed: The total number of successfully completed checkpoints since the job started. - - Failed: The total number of failed checkpoints since the job started. - - Restored: The number of restore operations since the job started. This also tells you how many times the job has restarted since submission. Note that the initial submission with a savepoint also counts as a restore and the count is reset if the JobManager was lost during operation. -- **Latest Completed Checkpoint**: The latest successfully completed checkpoints. Clicking on `More details` gives you detailed statistics down to the subtask level. -- **Latest Failed Checkpoint**: The latest failed checkpoint. Clicking on `More details` gives you detailed statistics down to the subtask level. -- **Latest Savepoint**: The latest triggered savepoint with its external path. Clicking on `More details` gives you detailed statistics down to the subtask level. -- **Latest Restore**: There are two types of restore operations. - - Restore from Checkpoint: We restored from a regular periodic checkpoint. - - Restore from Savepoint: We restored from a savepoint. - -### History Tab - -The checkpoint history keeps statistics about recently triggered checkpoints, including those that are currently in progress. + - Triggered:自作业开始以来触发的 Checkpoint 总数。 + - In Progress:当前正在进行的 Checkpoint 数量。 + - Completed:自作业开始以来成功完成的 Checkpoint 总数。 + - Failed:自作业开始以来失败的 Checkpoint 总数。 + - Restored:自作业开始以来进行的 Restored 操作的次数。这还表示自 Job 提交以来已重新启动多少次。请注意,带有 Savepoint 的初始提交也算作一次 Restore,如果 JobManager 在此操作过程中丢失,则该计数将重置。 +- **Latest Completed Checkpoint**:最新(最近)成功完成的 Checkpoint。点击 `More details` 可以得到 subtask 级别的详细统计信息。 +- **Latest Failed Checkpoint**:最新失败的 Checkpoint。点击 `More details` 可以得到 subtask 级别的详细统计信息。 +- **Latest Savepoint**:最新触发的 Savepoint 及其外部路径。点击 `More details` 可以得到 subtask 级别的详细统计信息。 +- **Latest Restore**:有两种类型的 Restore 操作。 + - Restore from Checkpoint:从定期的 Checkpoint 还原。 Review comment: 这里如果翻译成 `从 Checkpoint 还原` 或者 `从 Checkpoint 恢复` 是否可以,因为 Checkpoint 默认都是 定期的 ## File path: docs/monitoring/checkpoint_monitoring.zh.md ## @@ -25,105 +25,125 @@ under the License. * ToC {:toc} -## Overview + -Flink's web interface provides a tab to monitor the checkpoints of jobs. These stats are also available after the job has terminated. There are four different tabs to display information about your checkpoints: Overview, History, Summary, and Configuration. The following sections will cover all of these in turn. +## 概览(Overview) -## Monitoring +Flink 的 Web 界面提供了`选项卡/标签(tab)`来监视作业的 Checkpoint 信息。作业终止后,这些统计信息仍然可用。有四个不同的选项卡可显示有关 Checkpoint 的信息:概览(Overview),历史记录(History),摘要信息(Summary)和配置信息(Configuration)。以下各节将依次介绍这些内容。 -### Overview Tab + -The overview tabs lists the following statistics. Note that these statistics don't survive a JobManager loss and are reset to if your JobManager fails over. +## 监控(Monitoring) + + + +### 概览(Overview)选项卡 + +概览选项卡列出了以下统计信息。请注意,这些统计信息在 JobManager 丢失时无法保存,如果 JobManager 发生故障转移,这些统计信息将重置。 - **Checkpoint Counts** - - Triggered: The total number of checkpoints that have been triggered since the job started. - - In Progress: The current number of checkpoints that are in progress. - - Completed: The total number of successfully completed checkpoints since the job started. - - Failed: The total number of failed checkpoints since the job started. - - Restored: The number of restore operations since the job started. This also tells you how many times the job has restarted since submission. Note that the
[GitHub] [flink] gm7y8 commented on a change in pull request #13458: FLINK-18851 [runtime] Add checkpoint type to checkpoint history entries in Web UI
gm7y8 commented on a change in pull request #13458: URL: https://github.com/apache/flink/pull/13458#discussion_r513184231 ## File path: flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts ## @@ -57,9 +60,23 @@ export class JobCheckpointsDetailComponent implements OnInit { refresh() { Review comment: done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19827) Allow the harness to start with a user provided Flink configuration
[ https://issues.apache.org/jira/browse/FLINK-19827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19827: --- Labels: pull-request-available (was: ) > Allow the harness to start with a user provided Flink configuration > --- > > Key: FLINK-19827 > URL: https://issues.apache.org/jira/browse/FLINK-19827 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions >Reporter: Igal Shilman >Assignee: Igal Shilman >Priority: Major > Labels: pull-request-available > > Currently the Harness does not use the user supplied flink-configuration > fully. > We only use it to configure some aspects of the execution environments, but > other aspects as save point location etc, are effectively ignored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink-statefun] tzulitai commented on pull request #167: [FLINK-19827] Provide an explicit configuration to the local execution environment.
tzulitai commented on pull request #167: URL: https://github.com/apache/flink-statefun/pull/167#issuecomment-717701167 +1 as well, thanks Igal! Merging this now ... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13307: [FLINK-19078][table-runtime] Import rowtime join temporal operator
flinkbot edited a comment on pull request #13307: URL: https://github.com/apache/flink/pull/13307#issuecomment-685763264 ## CI report: * 85867d4ab920f571c1dcad56fea6a89c3b717b13 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8453) * dee9ebd0a1947e6580064910a0dace6f5e349c52 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13300: [FLINK-19077][table-runtime] Import process time temporal join operator.
flinkbot edited a comment on pull request #13300: URL: https://github.com/apache/flink/pull/13300#issuecomment-684957567 ## CI report: * Unknown: [CANCELED](TBD) * c777052640e6c3005314d8dd284cf0c4785fdf8a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8445) * 6bf5dc2e71b14086d5f908cc300060595adc46ea Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8452) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai commented on pull request #166: [FLINK-19826][docker] Use wildcards instead of a specific Flink version
tzulitai commented on pull request #166: URL: https://github.com/apache/flink-statefun/pull/166#issuecomment-717699559 Thanks @igalshilman, LGTM. Will merge this now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table
wuchong commented on a change in pull request #13605: URL: https://github.com/apache/flink/pull/13605#discussion_r513164938 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/BulkFormatFactory.java ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.table.connector.format.BulkDecodingFormat; +import org.apache.flink.table.data.RowData; + +/** + * Base interface for configuring a {@link BulkFormat} for file system connector. + * + * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context) + */ +@Internal +public interface BulkFormatFactory extends DecodingFormatFactory> { + // interface is used for discovery but is already fully specified by the generics Review comment: Remove this? ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java ## @@ -93,124 +117,17 @@ private static Configuration getParquetConfiguration(ReadableConfig options) { } @Override - public InputFormat createReader(ReaderContext context) { - return new ParquetInputFormat( - context.getPaths(), - context.getSchema().getFieldNames(), - context.getSchema().getFieldDataTypes(), - context.getProjectFields(), - context.getDefaultPartName(), - context.getPushedDownLimit(), - getParquetConfiguration(context.getFormatOptions()), - context.getFormatOptions().get(UTC_TIMEZONE)); + public String factoryIdentifier() { + return "parquet"; } @Override - public Optional> createBulkWriterFactory(WriterContext context) { - return Optional.of(ParquetRowDataBuilder.createWriterFactory( - RowType.of(Arrays.stream(context.getFormatFieldTypes()) - .map(DataType::getLogicalType) - .toArray(LogicalType[]::new), - context.getFormatFieldNames()), - getParquetConfiguration(context.getFormatOptions()), - context.getFormatOptions().get(UTC_TIMEZONE))); + public Set> requiredOptions() { + return new HashSet<>(); } @Override - public Optional> createEncoder(WriterContext context) { - return Optional.empty(); - } - - /** -* An implementation of {@link ParquetInputFormat} to read {@link RowData} records -* from Parquet files. -*/ - public static class ParquetInputFormat extends FileInputFormat { - - private static final long serialVersionUID = 1L; - - private final String[] fullFieldNames; - private final DataType[] fullFieldTypes; - private final int[] selectedFields; - private final String partDefaultName; - private final boolean utcTimestamp; - private final SerializableConfiguration conf; - private final long limit; - - private transient ParquetColumnarRowSplitReader reader; - private transient long currentReadCount; - - public ParquetInputFormat( - Path[] paths, - String[] fullFieldNames, - DataType[] fullFieldTypes, - int[] selectedFields, - String partDefaultName, - long limit, - Configuration conf, -
[GitHub] [flink] wuchong commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table
wuchong commented on a change in pull request #13605: URL: https://github.com/apache/flink/pull/13605#discussion_r513164938 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/BulkFormatFactory.java ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.table.connector.format.BulkDecodingFormat; +import org.apache.flink.table.data.RowData; + +/** + * Base interface for configuring a {@link BulkFormat} for file system connector. + * + * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context) + */ +@Internal +public interface BulkFormatFactory extends DecodingFormatFactory> { + // interface is used for discovery but is already fully specified by the generics Review comment: Remove this comment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13812: [FLINK-19674][docs-zh] Translate "Docker" of "Clusters & Depolyment" page into Chinese
flinkbot commented on pull request #13812: URL: https://github.com/apache/flink/pull/13812#issuecomment-717687160 ## CI report: * 4f66617c4540a4987a5852cf0168f7e3a552c91a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513172411 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") +if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism") + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val containedRowKinds = changelogMode.getContainedKinds.toSet +val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing +else (containedRowKinds, primaryKeys.toList) match { +// fixme : if rowKinds only contains delete, is there somethinng to do with? Currently do nothing. +case (_, _) if(containedRowKinds == Set(RowKind.DELETE)) => inputTransformation +case (_, _) if(containedRowKinds == Set(RowKind.INSERT)) => inputTransformation +// fixme: for retract mode (insert and delete contains only), is there somethinng to do with? Currently do nothing. +case (_, _) if(containedRowKinds == Set(RowKind.INSERT,RowKind.DELETE)) => inputTransformation +case (_, Nil) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => throw new RuntimeException(s"ChangelogMode contains ${RowKind.UPDATE_AFTER}, but no primaryKeys were found") +case (_, _) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => new DataStream[RowData](env,inputTransformation).keyBy(primaryKeys:_*).getTransformation Review comment: About how to `HASH_DISTRIBUTED`, you should take a look to `StreamExecExchange` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13811: [FLINK-19836] Serialize the committable by the user provided serializer during network shuffle
flinkbot commented on pull request #13811: URL: https://github.com/apache/flink/pull/13811#issuecomment-717686306 ## CI report: * 1f87c59d4ccdac3e91b28aaefebde4cc0e479bdf UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13813: [FLINK-19491][avro] AvroSerializerSnapshot cannot handle large schema
flinkbot commented on pull request #13813: URL: https://github.com/apache/flink/pull/13813#issuecomment-717685369 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 61014eba30b67fc21a965f876d9484dd3873e14d (Wed Oct 28 04:18:58 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SteNicholas commented on pull request #13813: [FLINK-19491][avro] AvroSerializerSnapshot cannot handle large schema
SteNicholas commented on pull request #13813: URL: https://github.com/apache/flink/pull/13813#issuecomment-717684935 @AHeise , could you please help to review this pull request if you are available? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13784: [FLINK-19698][connectors/common] API improvements to the Sources.
flinkbot edited a comment on pull request #13784: URL: https://github.com/apache/flink/pull/13784#issuecomment-716153889 ## CI report: * 7e29d587bda45d3ddf213ff70c9b0febbd4f9e9f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8244) * ca695f256a825fdebadf79f576b0875d4faea7cd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SteNicholas opened a new pull request #13813: [FLINK-19491][avro] AvroSerializerSnapshot cannot handle large schema
SteNicholas opened a new pull request #13813: URL: https://github.com/apache/flink/pull/13813 ## What is the purpose of the change *Currently Flink can only handle schemas up to a size of 65535 bytes, and `AvroSerializerSnapshot` cannot handle large schema. If size of schame exceed 65535 bytes, the `UTFDataFormatException` could be thrown. `AvroSerializerSnapshot` should introduce a V3 that uses `StringValue.writeString()` and `StringValue.readString()` to handle the string encoding.* ## Brief change log - *`AvroSerializerSnapshot` introduces a V3 that uses `StringValue.writeString()` in `writeSnapshot()` and `StringValue.readString()` in `readSnapshot()`.* ## Verifying this change - *`AvroSerializerSnapshotTest` adds the test method `largeSchemaWithExceedV2ThresholdSizeShouldResultInCompatibleV3Serializer` to verify the test case whether the large schema which exceeds a threshold size of 65535 bytes is compatible with the new version 3 of `AvroSerializerSnapshot`.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13797: [FLINK-19465][runtime / statebackends] Add CheckpointStorage interface and wire through runtime
flinkbot edited a comment on pull request #13797: URL: https://github.com/apache/flink/pull/13797#issuecomment-716854695 ## CI report: * c06b3dcd4eb4045f66352c8b65c14a7b60163ecd UNKNOWN * 185e46fd046c5f39a87ac3583ad559b065a275e1 UNKNOWN * 346e78df221bb7fae66979bed5d8e088c6f1c4e1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8433) * f9196a71051202e88294076b7a514f895fb9f443 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8451) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19491) AvroSerializerSnapshot cannot handle large schema
[ https://issues.apache.org/jira/browse/FLINK-19491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19491: --- Labels: pull-request-available (was: ) > AvroSerializerSnapshot cannot handle large schema > - > > Key: FLINK-19491 > URL: https://issues.apache.org/jira/browse/FLINK-19491 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.10.2, 1.12.0, 1.11.2 >Reporter: Arvid Heise >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > > Flink can only handle schemas up to a size of 64kb. > > {noformat} > Caused by: java.io.UTFDataFormatException: encoded string too long: 223502 > bytes > at java.io.DataOutputStream.writeUTF(DataOutputStream.java:364) > at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) > at > org.apache.flink.formats.avro.typeutils.AvroSerializerSnapshot.writeSnapshot(AvroSerializerSnapshot.java:75) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153) > at > org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:159) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.writeSnapshot(CompositeTypeSerializerSnapshot.java:148) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.write(TypeSerializerSnapshotSerializationUtil.java:138) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:55) > at > org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentWriterImpl.writeStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:183) > at > org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:126) > at > org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:171) > at > org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:510) > ... 5 common frames omitted{noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #13784: [FLINK-19698][connectors/common] API improvements to the Sources.
flinkbot edited a comment on pull request #13784: URL: https://github.com/apache/flink/pull/13784#issuecomment-716153889 ## CI report: * ca695f256a825fdebadf79f576b0875d4faea7cd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8450) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
flinkbot edited a comment on pull request #13763: URL: https://github.com/apache/flink/pull/13763#issuecomment-715195599 ## CI report: * a28669bca9c36dac74f89da177825d73bea0ece0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8408) * ba752274c9926115f65f5ecbef55b71b0b71cfa2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8440) * f004220668e20dcd9860026b69566868d473db33 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13744: [FLINK-19766][table-runtime] Introduce File streaming compaction operators
flinkbot edited a comment on pull request #13744: URL: https://github.com/apache/flink/pull/13744#issuecomment-714369975 ## CI report: * c51d4e8bf5dba50e4cb501fc04a87c924fe27e36 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8390) * b4cbace2b682490ac53a4a8cb4438f2361f3cffd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8446) * c8f3503c99ec4add43df3d054783f7a48501324c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513169206 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: For the user which want to set parallelism, I think it is OK to let them create an implementation class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13307: [FLINK-19078][table-runtime] Import rowtime join temporal operator
flinkbot edited a comment on pull request #13307: URL: https://github.com/apache/flink/pull/13307#issuecomment-685763264 ## CI report: * 20fe41449f841cfbbb155c1f1c842c5d814ca934 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8447) * 85867d4ab920f571c1dcad56fea6a89c3b717b13 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513169070 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") +if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism") + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val containedRowKinds = changelogMode.getContainedKinds.toSet +val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing +else (containedRowKinds, primaryKeys.toList) match { +// fixme : if rowKinds only contains delete, is there somethinng to do with? Currently do nothing. Review comment: @JingsongLi I only do `keyBy` on ChangelogMode which contains `update_after`, while on other changelogMode I just keep the transformation with do nothing upon it. Is that Proper or not? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13300: [FLINK-19077][table-runtime] Import process time temporal join operator.
flinkbot edited a comment on pull request #13300: URL: https://github.com/apache/flink/pull/13300#issuecomment-684957567 ## CI report: * Unknown: [CANCELED](TBD) * c777052640e6c3005314d8dd284cf0c4785fdf8a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8445) * 6bf5dc2e71b14086d5f908cc300060595adc46ea UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513168676 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() Review comment: Add a scala `assert` is enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513168776 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") Review comment: Yes, should always throw `TableException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format
flinkbot edited a comment on pull request #13296: URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193 ## CI report: * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN * 441a0e088ad94379c71b49121444ca5ea533a55b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8449) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi it is recommended to use the Optional only in method return values copy that. I think we don't need provide method. Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi **it is recommended to use the Optional only in method return values ** copy that. ** I think we don't need provide method.** Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi **it is recommended to use the Optional only in method return values ** copy that. ** I think we don't need provide method.** Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi **it is recommended to use the Optional only in method return values ** copy that. ** I think we don't need provide method.** Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi > it is recommended to use the Optional only in method return values copy that. > I think we don't need provide method. Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513165608 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism Review comment: Since we don‘t need to do this according to the conversation below here, it is fine to remove this line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-12884) FLIP-144: Native Kubernetes HA Service
[ https://issues.apache.org/jira/browse/FLINK-12884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221944#comment-17221944 ] shravan commented on FLINK-12884: - [~fly_in_gis] We have just migrated to Kubernetes (EKS) and setting up the Flink cluster/operator on the K8s at the moment. We need to enable HA for the flink job manager and since we already have an AWS MSK (AWS managed kafka which is on zookeeper) we may not want to setup another zookeeper cluster on EKS (Kubernetes). Just wanted to check if the native kubernetes HA service is available to implement now? If yes, is it a stable version? Please share nay documentation/runbook steps to follow through. Also, if you have any other thoughts on setting up HA kindly share. Thanks, Shravan > FLIP-144: Native Kubernetes HA Service > -- > > Key: FLINK-12884 > URL: https://issues.apache.org/jira/browse/FLINK-12884 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes, Runtime / Coordination >Reporter: MalcolmSanders >Assignee: Yang Wang >Priority: Major > Fix For: 1.12.0 > > > Currently flink only supports HighAvailabilityService using zookeeper. As a > result, it requires a zookeeper cluster to be deployed on k8s cluster if our > customers needs high availability for flink. If we support > HighAvailabilityService based on native k8s APIs, it will save the efforts of > zookeeper deployment as well as the resources used by zookeeper cluster. It > might be especially helpful for customers who run small-scale k8s clusters so > that flink HighAvailabilityService may not cause too much overhead on k8s > clusters. > Previously [FLINK-11105|https://issues.apache.org/jira/browse/FLINK-11105] > has proposed a HighAvailabilityService using etcd. As [~NathanHowell] > suggested in FLINK-11105, since k8s doesn't expose its own etcd cluster by > design (see [Securing etcd > clusters|https://kubernetes.io/docs/tasks/administer-cluster/configure-upgrade-etcd/#securing-etcd-clusters]), > it also requires the deployment of etcd cluster if flink uses etcd to > achieve HA. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] leonardBang commented on pull request #13300: [FLINK-19077][table-runtime] Import process time temporal join operator.
leonardBang commented on pull request #13300: URL: https://github.com/apache/flink/pull/13300#issuecomment-717679762 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13812: [FLINK-19674][docs-zh] Translate "Docker" of "Clusters & Depolyment" page into Chinese
flinkbot commented on pull request #13812: URL: https://github.com/apache/flink/pull/13812#issuecomment-717678848 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 4f66617c4540a4987a5852cf0168f7e3a552c91a (Wed Oct 28 03:53:19 UTC 2020) ✅no warnings Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13810: [FLINK-18811][network] Pick another tmpDir if an IOException occurs w…
flinkbot edited a comment on pull request #13810: URL: https://github.com/apache/flink/pull/13810#issuecomment-717666478 ## CI report: * 2c4d8bab8fac4ca8dea20c7cb326f62ca3dd7817 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8448) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13797: [FLINK-19465][runtime / statebackends] Add CheckpointStorage interface and wire through runtime
flinkbot edited a comment on pull request #13797: URL: https://github.com/apache/flink/pull/13797#issuecomment-716854695 ## CI report: * c06b3dcd4eb4045f66352c8b65c14a7b60163ecd UNKNOWN * 185e46fd046c5f39a87ac3583ad559b065a275e1 UNKNOWN * 346e78df221bb7fae66979bed5d8e088c6f1c4e1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8433) * f9196a71051202e88294076b7a514f895fb9f443 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513165407 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism Review comment: Well, correct me if i am wrong, `getMaxParallelism` is the upper bound of parallelism that the task can apply. `getParallelism ` is the actual parallelism that the task applies This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513164185 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() Review comment: My concern is that once a NEW provider without `parallelismProvider` is used, exception will be thrown unless we do not check the type This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19674) Translate "Docker" of "Clusters & Depolyment" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-19674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19674: --- Labels: pull-request-available (was: ) > Translate "Docker" of "Clusters & Depolyment" page into Chinese > --- > > Key: FLINK-19674 > URL: https://issues.apache.org/jira/browse/FLINK-19674 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Documentation >Reporter: Shubin Ruan >Assignee: Shubin Ruan >Priority: Major > Labels: pull-request-available > > The page url is > [https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/docker.html] > The markdown file is located in {{flink/docs/ops/deployment/docker.zh.md}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] ShubinRuan opened a new pull request #13812: [FLINK-19674][docs-zh] Translate "Docker" of "Clusters & Depolyment" page into Chinese
ShubinRuan opened a new pull request #13812: URL: https://github.com/apache/flink/pull/13812 ## What is the purpose of the change Translate "Docker" of "Clusters & Depolyment" page into Chinese The page url is https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/docker.html The markdown file is located in flink/docs/ops/deployment/docker.zh.md ## Brief change log - translate `flink/docs/ops/deployment/docker.zh.md` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with @Public(Evolving): no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513163662 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") Review comment: Sure, btw should `TableException` also be thrown here instead of `RuntimeException`? @JingsongLi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] guoweiM commented on a change in pull request #13808: [FLINK-19834] Make the TestSink reusable in all the sink related tests.
guoweiM commented on a change in pull request #13808: URL: https://github.com/apache/flink/pull/13808#discussion_r513163601 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperatorTest.java ## @@ -121,13 +119,18 @@ public void restoredFromMergedState() throws Exception { testHarness.open(); final List expectedOutput = new ArrayList<>(); - expectedOutput.add(TestSink.TestGlobalCommitter.COMBINER.apply(input1)); - expectedOutput.add(TestSink.TestGlobalCommitter.COMBINER.apply(input2)); + expectedOutput.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(input1)); + expectedOutput.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(input2)); testHarness.snapshot(1L, 1L); testHarness.notifyOfCompletedCheckpoint(1L); testHarness.close(); + // TODO:: maybe there is no output at all Review comment: thanks @aljoscha and @kl0u . It would be fix in the next pr. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13811: [FLINK-19836] Serialize the committable by the user provided serializer during network shuffle
flinkbot commented on pull request #13811: URL: https://github.com/apache/flink/pull/13811#issuecomment-717677509 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 1f87c59d4ccdac3e91b28aaefebde4cc0e479bdf (Wed Oct 28 03:47:48 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-19836).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19836) Serialize the committable by the user provided serializer during network shuffle
[ https://issues.apache.org/jira/browse/FLINK-19836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19836: --- Labels: pull-request-available (was: ) > Serialize the committable by the user provided serializer during network > shuffle > > > Key: FLINK-19836 > URL: https://issues.apache.org/jira/browse/FLINK-19836 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Guowei Ma >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] guoweiM opened a new pull request #13811: [FLINK-19836] Serialize the committable by the user provided serializer during network shuffle
guoweiM opened a new pull request #13811: URL: https://github.com/apache/flink/pull/13811 ## What is the purpose of the change The behavior of the committable's serializer that is extracted by the framework might be different with the serializer provided by the user. As a result the downstream operator might get a different committable if we use the serializer extracted by the framework during network shuffle. This leads to some unexpected result. So this patch makes the shuffle between the `writer/committer/globalcommitter` use the serializer provided by the user. ## Brief change log 1. Change the intput/output type of writer operator from `CommT` to `byte[]` 2. Change the input/output type of committer operator from `CommT` to `byte[]` 3. Change the input/output type of global committer operator from `CommT` to `byte[]` 4. All the sink operator would not send any comittables if there is no downstream operator. ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on a change in pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
danny0405 commented on a change in pull request #13763: URL: https://github.com/apache/flink/pull/13763#discussion_r513162229 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java ## @@ -297,32 +300,53 @@ private static DataType convertToDataType(Schema schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType) { + // If it is parsing the root row type, switches from nullable true to false + // because a nullable row type is meaningless and would generate wrong schema. + if (logicalType.getTypeRoot() == LogicalTypeRoot.ROW Review comment: Although the `AvroSchemaConverter` is a tool class, it is still used as a public API, so i'm inclined to keep the signature unchanged. Another reason is that only logical type is enough for the conversion. Have added more documents to the methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13307: [FLINK-19078][table-runtime] Import rowtime join temporal operator
flinkbot edited a comment on pull request #13307: URL: https://github.com/apache/flink/pull/13307#issuecomment-685763264 ## CI report: * 20fe41449f841cfbbb155c1f1c842c5d814ca934 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8447) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13685: [FLINK-19702][hive] Avoid using HiveConf::hiveSiteURL
JingsongLi commented on a change in pull request #13685: URL: https://github.com/apache/flink/pull/13685#discussion_r513158356 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -214,7 +202,19 @@ private static HiveConf createHiveConf(@Nullable String hiveConfDir, @Nullable S } else { hadoopConf = getHadoopConfiguration(hadoopConfDir); } - return new HiveConf(hadoopConf == null ? new Configuration() : hadoopConf, HiveConf.class); + HiveConf hiveConf = new HiveConf(hadoopConf == null ? new Configuration() : hadoopConf, HiveConf.class); + + LOG.info("Setting hive conf dir as {}", hiveConfDir); + + if (hiveConfDir != null) { + Path hiveSite = new Path(hiveConfDir, "hive-site.xml"); Review comment: Maybe you should use `new File(..).toURI()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-19628) Introduce multi-input operator for streaming
[ https://issues.apache.org/jira/browse/FLINK-19628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-19628: -- Assignee: godfrey he > Introduce multi-input operator for streaming > > > Key: FLINK-19628 > URL: https://issues.apache.org/jira/browse/FLINK-19628 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: Caizhi Weng >Assignee: godfrey he >Priority: Major > Fix For: 1.12.0 > > > After the planner is ready for multi-input, we should introduce multi-input > operator for streaming. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] becketqin commented on pull request #13784: [FLINK-19698][connectors/common] API improvements to the Sources.
becketqin commented on pull request #13784: URL: https://github.com/apache/flink/pull/13784#issuecomment-717670076 @StephanEwen @stevenzwu I have updated the patch with the following changes: 1. Move `CheckpointListener` from `flink-runtime` to `flink-core`. Because this interface is a public API, I left the existing interface class in flink-runtime but marked it as deprecated. All the usages within Apache Flink have been migrated to use the new package. 2. Let `SplitEnumerator`/`SourceReader`/`OperatorCoordinator` implement `CheckpointListener`. 3. Moving the `waitUntil()` method to `CommonTestUtils`. Would you have time to take another look? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-19698) Add close() method and onCheckpointComplete() to the Source.
[ https://issues.apache.org/jira/browse/FLINK-19698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-19698: --- Labels: pull-request-available (was: ) > Add close() method and onCheckpointComplete() to the Source. > > > Key: FLINK-19698 > URL: https://issues.apache.org/jira/browse/FLINK-19698 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.11.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > Labels: pull-request-available > > Right now there are some caveats to the new Source API. From the > implementation of some connectors. We would like to make the following > improvements to the current Source API. > # Add the following method to the {{SplitReader}} API. > {{public void close() throws Exception;}} > This method allows the SplitReader implementations to be closed properly when > the split fetcher exits. > # Add the following method to the {{SourceReader}} API. > {{public void checkpointComplete(long checkpointId);}} > This method allows the {{SourceReader}} to take some cleanup / reporting > actions when a checkpoint has been successfully taken. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] xiaoHoly commented on a change in pull request #13801: [FLINK-19213][docs-zh] Update the Chinese documentation
xiaoHoly commented on a change in pull request #13801: URL: https://github.com/apache/flink/pull/13801#discussion_r513155682 ## File path: docs/dev/table/connectors/formats/avro-confluent.zh.md ## @@ -29,27 +29,27 @@ under the License. * This will be replaced by the TOC {:toc} -The Avro Schema Registry (``avro-confluent``) format allows you to read records that were serialized by the ``io.confluent.kafka.serializers.KafkaAvroSerializer`` and to write records that can in turn be read by the ``io.confluent.kafka.serializers.KafkaAvroDeserializer``. +Avro Schema Registry (``avro-confluent``) 格式能让你读取被 ``io.confluent.kafka.serializers.KafkaAvroSerializer``序列化的记录并可以写入成 ``io.confluent.kafka.serializers.KafkaAvroDeserializer``反序列化的记录。 -When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version id encoded in the record while the reader schema is inferred from table schema. +当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。 -When writing (serializing) a record with this format the Avro schema is inferred from the table schema and used to retrieve a schema id to be encoded with the data. The lookup is performed with in the configured Confluent Schema Registry under the [subject](https://docs.confluent.io/current/schema-registry/index.html#schemas-subjects-and-topics) given in `avro-confluent.schema-registry.subject`. +当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并用于检索要与数据一起编码的 schema id。检索是在 Confluent Schema Registry 配置中的 `avro-confluent.schema-registry.subject` 中指定的[subject](https://docs.confluent.io/current/schema-registry/index.html#schemas-subjects-and-topics)下执行的。 Review comment: thanks,i will update my code with your comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-19789) Migrate Hive connector to new table source sink interface
[ https://issues.apache.org/jira/browse/FLINK-19789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-19789. Resolution: Fixed master (1.12): 0a14ad1cc47c4c6d4de0d5c90d3cd9578ca2536c > Migrate Hive connector to new table source sink interface > - > > Key: FLINK-19789 > URL: https://issues.apache.org/jira/browse/FLINK-19789 > Project: Flink > Issue Type: Sub-task >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19727) Implement ParallelismProvider for sink in blink planner
[ https://issues.apache.org/jira/browse/FLINK-19727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-19727: - Fix Version/s: 1.12.0 > Implement ParallelismProvider for sink in blink planner > --- > > Key: FLINK-19727 > URL: https://issues.apache.org/jira/browse/FLINK-19727 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Lsw_aka_laplace >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19727) Implement ParallelismProvider for sink in blink planner
[ https://issues.apache.org/jira/browse/FLINK-19727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221927#comment-17221927 ] Jingsong Lee commented on FLINK-19727: -- Hi [~neighborhood], you have created PR for this ticket, please click {{start-inprogress}} in Jira too. > Implement ParallelismProvider for sink in blink planner > --- > > Key: FLINK-19727 > URL: https://issues.apache.org/jira/browse/FLINK-19727 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Lsw_aka_laplace >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19836) Serialize the committable by the user provided serializer during network shuffle
[ https://issues.apache.org/jira/browse/FLINK-19836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guowei Ma updated FLINK-19836: -- Summary: Serialize the committable by the user provided serializer during network shuffle (was: Serialize the committable by the serializer provided by the user during network shuffle) > Serialize the committable by the user provided serializer during network > shuffle > > > Key: FLINK-19836 > URL: https://issues.apache.org/jira/browse/FLINK-19836 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Guowei Ma >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513153908 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") +if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism") + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val containedRowKinds = changelogMode.getContainedKinds.toSet +val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing +else (containedRowKinds, primaryKeys.toList) match { +// fixme : if rowKinds only contains delete, is there somethinng to do with? Currently do nothing. Review comment: Yes, you can just `changelogMode == ChangelogMode.INSERT_ONLY` (scala, java should use equals), and do keyBy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513153207 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") Review comment: Just `parallelism <= 0`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513152977 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() Review comment: Remove `isInstanceOf`, it must be This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513152799 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism Review comment: `env.getMaxParallelism`? ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") +if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism") Review comment: We don't need verify `env.getMaxParallelism`, DataStream layer will verify it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13810: [FLINK-18811][network] Pick another tmpDir if an IOException occurs w…
flinkbot commented on pull request #13810: URL: https://github.com/apache/flink/pull/13810#issuecomment-717666478 ## CI report: * 2c4d8bab8fac4ca8dea20c7cb326f62ca3dd7817 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513152137 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -79,6 +84,7 @@ class CommonPhysicalSink ( val enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames) runtimeProvider match { + case _: DataStreamSinkProvider with ParallelismProvider => throw new RuntimeException("`DataStreamSinkProvider` is not allowed to work with `ParallelismProvider`, please see document of `ParallelismProvider`") Review comment: `TableException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513152300 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -79,6 +84,7 @@ class CommonPhysicalSink ( val enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames) runtimeProvider match { + case _: DataStreamSinkProvider with ParallelismProvider => throw new RuntimeException("`DataStreamSinkProvider` is not allowed to work with `ParallelismProvider`, please see document of `ParallelismProvider`") Review comment: not to have too long lines This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513151741 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/OutputFormatProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of an {@link OutputFormat} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface OutputFormatProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface OutputFormatProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider{ /** * Helper method for creating a static provider. */ static OutputFormatProvider of(OutputFormat outputFormat) { - return () -> outputFormat; + return of(outputFormat, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static OutputFormatProvider of(OutputFormat outputFormat, Optional parallelism) { Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
JingsongLi commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513151711 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: I think we don't need provide method. https://flink.apache.org/contributing/code-style-and-quality-java.html#java-optional In Flink code style, it is recommended to use the Optional only in method return values. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13744: [FLINK-19766][table-runtime] Introduce File streaming compaction operators
flinkbot edited a comment on pull request #13744: URL: https://github.com/apache/flink/pull/13744#issuecomment-714369975 ## CI report: * c51d4e8bf5dba50e4cb501fc04a87c924fe27e36 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8390) * b4cbace2b682490ac53a4a8cb4438f2361f3cffd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8446) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13307: [FLINK-19078][table-runtime] Import rowtime join temporal operator
flinkbot edited a comment on pull request #13307: URL: https://github.com/apache/flink/pull/13307#issuecomment-685763264 ## CI report: * 0edbb09cea1b10a677b91c17b889ec74ef8dfbed Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8427) * 20fe41449f841cfbbb155c1f1c842c5d814ca934 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13300: [FLINK-19077][table-runtime] Import process time temporal join operator.
flinkbot edited a comment on pull request #13300: URL: https://github.com/apache/flink/pull/13300#issuecomment-684957567 ## CI report: * Unknown: [CANCELED](TBD) * c777052640e6c3005314d8dd284cf0c4785fdf8a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8445) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format
flinkbot edited a comment on pull request #13296: URL: https://github.com/apache/flink/pull/13296#issuecomment-684611193 ## CI report: * c266341c63cc4cdc28a6585a82294710fd8ebbc5 UNKNOWN * d7d795f0085c86a8a170c80a47ef949d5d3dfeb7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8380) * 441a0e088ad94379c71b49121444ca5ea533a55b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yuchuanchen edited a comment on pull request #13718: [FLINK-18811] Pick another tmpDir if an IOException occurs when creating spill file
yuchuanchen edited a comment on pull request #13718: URL: https://github.com/apache/flink/pull/13718#issuecomment-717663344 Hi @pnowojski, I squashed the commits and opened a new pull request. https://github.com/apache/flink/pull/13810 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13810: [FLINK-18811][network] Pick another tmpDir if an IOException occurs w…
flinkbot commented on pull request #13810: URL: https://github.com/apache/flink/pull/13810#issuecomment-717663534 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 2c4d8bab8fac4ca8dea20c7cb326f62ca3dd7817 (Wed Oct 28 02:57:59 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-18811).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yuchuanchen commented on pull request #13718: [FLINK-18811] Pick another tmpDir if an IOException occurs when creating spill file
yuchuanchen commented on pull request #13718: URL: https://github.com/apache/flink/pull/13718#issuecomment-717663344 @pnowojski Is squashed the commits and opened a new pull request. https://github.com/apache/flink/pull/13810 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yuchuanchen opened a new pull request #13810: [FLINK-18811][network] Pick another tmpDir if an IOException occurs w…
yuchuanchen opened a new pull request #13810: URL: https://github.com/apache/flink/pull/13810 …hen creating spill file ## What is the purpose of the change Pick another disk(tmpDir) for Spilling Channel, rather than throw an IOException, which causes flink job restart over and over again. ## Brief change log ## Verifying this change This change is already covered by existing tests, such as SpanningWrapperTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: ( no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shuiqiangchen commented on pull request #13756: [FLINK-19770][python][test] Changed the PythonProgramOptionTest to be an ITCase.
shuiqiangchen commented on pull request #13756: URL: https://github.com/apache/flink/pull/13756#issuecomment-717661702 Hi @anonymouscodeholic , I have updated the pr according to your suggestions, please have a look at it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yuchuanchen closed pull request #13718: [FLINK-18811] Pick another tmpDir if an IOException occurs when creating spill file
yuchuanchen closed pull request #13718: URL: https://github.com/apache/flink/pull/13718 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #13697: [FLINK-19357][FLINK-19357][fs-connector] Introduce createBucketWriter to BucketsBuilder & Introduce FileLifeCycleListener to B
JingsongLi commented on a change in pull request #13697: URL: https://github.com/apache/flink/pull/13697#discussion_r513147361 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java ## @@ -251,14 +254,20 @@ T withBucketFactory(final BucketFactory factory) { return self(); } + @Internal + @Override Review comment: Sorry, let me explain the context: In the file compaction, we need to write a file with the specified name. We hope to use `BucketWriter` to complete atomic file level writing. The `BucketWriter` can complete atomic writing, but also shield the implementation details of BulkWriter, RowWiseWriter, OutputStreamBasedWriting and PathBasedWriting. That's great. It's like the `createBuckets` method, We can not let it be package-private, because it is used by another module (flink-table). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] caozhen1937 commented on pull request #13296: [FLINK-18774][format][debezium] Support debezium-avro format
caozhen1937 commented on pull request #13296: URL: https://github.com/apache/flink/pull/13296#issuecomment-717658890 @HsbcJone "Found null,Expecting row_1" exception appear because avro record name is repeated. I fix,you can test again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13763: [FLINK-19779][avro] Remove the record_ field name prefix for Confluen…
flinkbot edited a comment on pull request #13763: URL: https://github.com/apache/flink/pull/13763#issuecomment-715195599 ## CI report: * a28669bca9c36dac74f89da177825d73bea0ece0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8408) * ba752274c9926115f65f5ecbef55b71b0b71cfa2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8440) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13760: [FLINK-19627][table-runtime] Introduce multiple input operator for batch
flinkbot edited a comment on pull request #13760: URL: https://github.com/apache/flink/pull/13760#issuecomment-715078238 ## CI report: * b1353f7422a706cd50502d62b988b06154d33ffc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8439) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8407) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13744: [FLINK-19766][table-runtime] Introduce File streaming compaction operators
flinkbot edited a comment on pull request #13744: URL: https://github.com/apache/flink/pull/13744#issuecomment-714369975 ## CI report: * c51d4e8bf5dba50e4cb501fc04a87c924fe27e36 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8390) * b4cbace2b682490ac53a4a8cb4438f2361f3cffd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #13801: [FLINK-19213][docs-zh] Update the Chinese documentation
wuchong commented on a change in pull request #13801: URL: https://github.com/apache/flink/pull/13801#discussion_r513141770 ## File path: docs/dev/table/connectors/formats/avro-confluent.zh.md ## @@ -72,50 +72,50 @@ CREATE TABLE user_behavior ( -Format Options +Format 参数 -Option -Required -Default -Type -Description +参数 +是否必选 +默认值 +类型 +描述 format - required + 必选 (none) String - Specify what format to use, here should be 'avro-confluent'. + 指定要使用的格式,这里应该是 'avro-confluent'. avro-confluent.schema-registry.url - required + 必选 (none) String - The URL of the Confluent Schema Registry to fetch/register schemas + 用于获取/注册 schemas 的 Confluent Schema Registry 的URL avro-confluent.schema-registry.subject - required by sink + sink 必选 (none) String - The Confluent Schema Registry subject under which to register the schema used by this format during serialization + Confluent Schema Registry主题,用于在序列化期间注册此格式使用的 schema -Data Type Mapping +数据类型映射 -Currently, Apache Flink always uses the table schema to derive the Avro reader schema during deserialization and Avro writer schema during serialization. Explicitly defining an Avro schema is not supported yet. -See the [Apache Avro Format]({% link dev/table/connectors/formats/avro.zh.md%}#data-type-mapping) for the mapping between Avro and Flink DataTypes. +目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。 +[Apache Avro Format]({% link dev/table/connectors/formats/avro.zh.md%}#data-type-mapping)中描述了flink数据和Avro数据的对应关系。 Review comment: ```suggestion [Apache Avro Format]({% link dev/table/connectors/formats/avro.zh.md%}#data-type-mapping)中描述了 Flink 数据类型和 Avro 类型的对应关系。 ``` ## File path: docs/dev/table/connectors/formats/avro-confluent.zh.md ## @@ -72,50 +72,50 @@ CREATE TABLE user_behavior ( -Format Options +Format 参数 -Option -Required -Default -Type -Description +参数 +是否必选 +默认值 +类型 +描述 format - required + 必选 (none) String - Specify what format to use, here should be 'avro-confluent'. + 指定要使用的格式,这里应该是 'avro-confluent'. avro-confluent.schema-registry.url - required + 必选 (none) String - The URL of the Confluent Schema Registry to fetch/register schemas + 用于获取/注册 schemas 的 Confluent Schema Registry 的URL avro-confluent.schema-registry.subject - required by sink + sink 必选 (none) String - The Confluent Schema Registry subject under which to register the schema used by this format during serialization + Confluent Schema Registry主题,用于在序列化期间注册此格式使用的 schema -Data Type Mapping +数据类型映射 -Currently, Apache Flink always uses the table schema to derive the Avro reader schema during deserialization and Avro writer schema during serialization. Explicitly defining an Avro schema is not supported yet. -See the [Apache Avro Format]({% link dev/table/connectors/formats/avro.zh.md%}#data-type-mapping) for the mapping between Avro and Flink DataTypes. +目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。 +[Apache Avro Format]({% link dev/table/connectors/formats/avro.zh.md%}#data-type-mapping)中描述了flink数据和Avro数据的对应关系。 -In addition to the types listed there, Flink supports reading/writing nullable types. Flink maps nullable types to Avro `union(something, null)`, where `something` is the Avro type converted from Flink type. +除了此处列出的类型之外,Flink 还支持读取/写入可为空的类型。 Flink 将可为空的类型映射到 Avro `union(something, null)`, 其中 `something` 是从 Flink 类型转换的 Avro 类型。 Review comment: ```suggestion 除了此处列出的类型之外,Flink 还支持读取/写入可为空(nullable)的类型。 Flink 将可为空的类型映射到 Avro `union(something, null)`, 其中 `something` 是从 Flink 类型转换的 Avro 类型。 ``` ## File path: docs/dev/table/connectors/formats/avro-confluent.zh.md ## @@ -29,27 +29,27 @@ under the License. * This will be replaced by the TOC {:toc} -The Avro Schema Registry (``avro-confluent``) format allows you to read records that were serialized by the ``io.confluent.kafka.serializers.KafkaAvroSerializer`` and to write records that can in turn be read by the ``io.confluent.kafka.serializers.KafkaAvroDeserializer``. +Avro Schema Registry (``avro-confluent``) 格式能让你读取被 ``io.confluent.kafka.serializers.KafkaAvroSerializer``序列化的记录并可以写入成 ``io.confluent.kafka.serializers.KafkaAvroDeserializer``反序列化的记录。
[GitHub] [flink] JingsongLi commented on a change in pull request #13697: [FLINK-19357][FLINK-19357][fs-connector] Introduce createBucketWriter to BucketsBuilder & Introduce FileLifeCycleListener to B
JingsongLi commented on a change in pull request #13697: URL: https://github.com/apache/flink/pull/13697#discussion_r513144820 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java ## @@ -37,6 +39,7 @@ final long initialPartCounter, final BucketWriter bucketWriter, final RollingPolicy rollingPolicy, + @Nullable final FileLifeCycleListener fileListener, Review comment: https://flink.apache.org/contributing/code-style-and-quality-java.html#java-optional In Flink code style, it is recommended to use the `Optional` only in method return values. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org