[GitHub] [flink] flinkbot commented on pull request #13459: [FLINK-18199] [chinese-translation]: translate FileSystem SQL Connector page into Chinese
flinkbot commented on pull request #13459: URL: https://github.com/apache/flink/pull/13459#issuecomment-697149813 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit a3e5590fcd9193b1699c4539cf93aa175b14d829 (Wed Sep 23 05:54:42 UTC 2020) ✅no warnings Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #11359: [FLINK-16095] [docs-zh] Translate "Modules" page of "Table API & SQL" into Chinese
flinkbot edited a comment on pull request #11359: URL: https://github.com/apache/flink/pull/11359#issuecomment-596916485 ## CI report: * a42e8d680a2a0d344a0ffda29c2891627f7d1e2d Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/170635599) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=3123) * a6f603321cbeca978ad64118ce9379ff1cb57316 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-18199) Translate "Filesystem SQL Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-18199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-18199: --- Labels: pull-request-available (was: ) > Translate "Filesystem SQL Connector" page into Chinese > -- > > Key: FLINK-18199 > URL: https://issues.apache.org/jira/browse/FLINK-18199 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Connectors / FileSystem, > Documentation, Table SQL / Ecosystem >Reporter: Jark Wu >Assignee: michaelli >Priority: Major > Labels: pull-request-available > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connectors/filesystem.html > The markdown file is located in > flink/docs/dev/table/connectors/filesystem.zh.md -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] michaelli916 opened a new pull request #13459: [FLINK-18199] [chinese-translation]: translate FileSystem SQL Connector page into Chinese
michaelli916 opened a new pull request #13459: URL: https://github.com/apache/flink/pull/13459 ## What is the purpose of the change *Translate "Filesystem SQL Connector" page into Chinese* ## Brief change log *Translate "Filesystem SQL Connector" page into Chinese* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] danny0405 commented on pull request #13033: [FLINK-18777][catalog] Supports schema registry catalog
danny0405 commented on pull request #13033: URL: https://github.com/apache/flink/pull/13033#issuecomment-697103132 @maver1ck The code seems straight-forward, @dawidwys said that he would like to review this PR, but i'm not sure if he has time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13451: [FLINK-19333][python] Introduce BatchArrowPythonOverWindowAggregateFunctionOperator
flinkbot edited a comment on pull request #13451: URL: https://github.com/apache/flink/pull/13451#issuecomment-696565311 ## CI report: * e682b70e2b68e60ed3950290b1ebe53a6ecd40d8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6757) * cb9329281ad618abbd1c75a1922e038a0869944e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6811) * fc7702fc7d994c1fc449b36c365ade2f43b3e7e6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6819) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong closed pull request #9356: [FLINK-13340][kafka][table] Add 'topics' and 'subscriptionPattern' option for Flink Kafka connector
wuchong closed pull request #9356: URL: https://github.com/apache/flink/pull/9356 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HuangXingBo commented on pull request #13451: [FLINK-19333][python] Introduce BatchArrowPythonOverWindowAggregateFunctionOperator
HuangXingBo commented on pull request #13451: URL: https://github.com/apache/flink/pull/13451#issuecomment-697073756 @dianfu Thanks a lot for the update. I have addressed the comments at the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lirui-apache commented on pull request #13455: [FLINK-19361][hive] Create a synchronized metastore client to talk to…
lirui-apache commented on pull request #13455: URL: https://github.com/apache/flink/pull/13455#issuecomment-697115923 @JingsongLi Thanks for the review. I have verified tests with Hive-1.0.1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #10354: [FLINK-14729][connectors] Multi-topics consuming from KafkaTableSource
wuchong commented on pull request #10354: URL: https://github.com/apache/flink/pull/10354#issuecomment-697100110 Sorry for late replying and not noticing this PR. The multi-topic (topic list and topic pattern) for SQL connector has been supported in FLINK-18449. How about to close this PR and FLINK-14729? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join
leonardBang commented on pull request #13289: URL: https://github.com/apache/flink/pull/13289#issuecomment-697081273 > The compile is failed, please fix it first. > > ``` > FlinkPlannerImpl.scala message=File line length exceeds 100 characters line=168 > ``` I've fixed and rebased This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Shawn-Hx commented on pull request #13410: [FLINK-19247][docs-zh] Update Chinese documentation after removal of Kafka 0.10 and 0.11
Shawn-Hx commented on pull request #13410: URL: https://github.com/apache/flink/pull/13410#issuecomment-697080083 Hi, @klion26 This PR is ready to be reviewed. Could you help to review it at your convenience? Thanks~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wsry commented on a change in pull request #13447: [FLINK-19297][network] Make ResultPartitionWriter record-oriented
wsry commented on a change in pull request #13447: URL: https://github.com/apache/flink/pull/13447#discussion_r493159836 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java ## @@ -63,6 +63,22 @@ public BoundedBlockingResultPartition( bufferPoolFactory); } + @Override + public void flush(int targetSubpartition) { + finishBroadcastBufferBuilder(); Review comment: This logic is already in BoundedBlockingResultPartition and should has no impact on streaming/pipelined cases. I guess the source file name is misread? ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java ## @@ -63,6 +63,22 @@ public BoundedBlockingResultPartition( bufferPoolFactory); } + @Override + public void flush(int targetSubpartition) { + finishBroadcastBufferBuilder(); Review comment: This logic is already in BoundedBlockingResultPartition and should have no impact on streaming/pipelined cases. I guess the source file name is misread? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-playgrounds] shuiqiangchen commented on pull request #16: [FLINK-19145][walkthroughs] Add PyFlink-walkthrough to Flink playground.
shuiqiangchen commented on pull request #16: URL: https://github.com/apache/flink-playgrounds/pull/16#issuecomment-697071304 @alpinegizmo Thank you for polishing the doc, I have updated the PR, please have a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #11896: [FLINK-14356] [single-value] Introduce "single-value" format to (de)serialize message to a single field
flinkbot edited a comment on pull request #11896: URL: https://github.com/apache/flink/pull/11896#issuecomment-618808279 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong merged pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join
wuchong merged pull request #13289: URL: https://github.com/apache/flink/pull/13289 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] galenwarren commented on pull request #152: [FLINK-19176] Add pluggable statefun payload serializer
galenwarren commented on pull request #152: URL: https://github.com/apache/flink-statefun/pull/152#issuecomment-696388005 @igalshilman Thanks! I'll take a look at the others shortly, but wanted to quickly reply regarding the question about Optional and the serialized snapshot format. (I can't reply to the comment itself for some reason ...) What you propose is absolutely doable. I just wanted to make sure you're aware that the suggested code is essentially identical to the implementation of StringUtils.writeNullableString, which is what is currently used to write the version 2 snapshot in MessageTypeSerializer.Snapshot.writeSnapshot, i.e.: ``` @Override public void writeSnapshot(DataOutputView dataOutputView) throws IOException { // version 1 dataOutputView.writeUTF(messageFactoryKey.getType().name()); // added in version 2 StringUtils.writeNullableString( messageFactoryKey.getCustomPayloadSerializerClassName(), dataOutputView); } ``` ... and ... ``` public static void writeNullableString(@Nullable String str, DataOutputView out) throws IOException { if (str != null) { out.writeBoolean(true); writeString(str, out); } else { out.writeBoolean(false); } } ``` So the serialized format would already seem to be what you've requested! But if you'd prefer it get written without using ```StringUtils.writeNullableString```, I'm happy to make that change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13447: [FLINK-19297][network] Make ResultPartitionWriter record-oriented
flinkbot edited a comment on pull request #13447: URL: https://github.com/apache/flink/pull/13447#issuecomment-696488834 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi merged pull request #13455: [FLINK-19361][hive] Create a synchronized metastore client to talk to…
JingsongLi merged pull request #13455: URL: https://github.com/apache/flink/pull/13455 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi merged pull request #13414: [FLINK-19227][Table SQL / API] The catalog is still created after opening failed in catalog registering
JingsongLi merged pull request #13414: URL: https://github.com/apache/flink/pull/13414 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #9356: [FLINK-13340][kafka][table] Add 'topics' and 'subscriptionPattern' option for Flink Kafka connector
wuchong commented on pull request #9356: URL: https://github.com/apache/flink/pull/9356#issuecomment-697100549 Close this as it has been supported in FLINK-18449. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13419: [FLINK-18842][e2e] Added 10min timeout to building the container.
flinkbot edited a comment on pull request #13419: URL: https://github.com/apache/flink/pull/13419#issuecomment-694745717 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13406: [FLINK-19243][elasticsearch] Bump snakeyaml to 1.27
flinkbot edited a comment on pull request #13406: URL: https://github.com/apache/flink/pull/13406#issuecomment-694116951 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)
xintongsong commented on pull request #13453: URL: https://github.com/apache/flink/pull/13453#issuecomment-696625955 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a change in pull request #13419: [FLINK-18842][e2e] Added 10min timeout to building the container.
zentol commented on a change in pull request #13419: URL: https://github.com/apache/flink/pull/13419#discussion_r492588052 ## File path: flink-end-to-end-tests/test-scripts/common.sh ## @@ -830,3 +830,30 @@ run_test_with_timeout() { $TEST_COMMAND ) } + +run_with_timeout() { + local timeout="$1" + shift + local pidFile="$1" + shift + local command="$@" + + # invoke command + (eval "$command"; rm $pidFile; ) & + pid=$! + echo $pid > $pidFile + + # invoke timeout guard + ( +sleep $timeout +if [[ -e $pidFile ]]; then Review comment: We should be able to re-use `run_test_with_timeout`. All we basically need to do is move this ``` echo "Printing Flink logs and killing it:" cat ${FLINK_DIR}/log/* ``` into a separate function, and add a new `run_with_timeout` that accepts a timeout, run-command, and on-failure-command. ``` run_test_with_timeout() { local TEST_TIMEOUT_SECONDS=$1 shift local TEST_COMMAND=$@ internal_run_with_timeout $TEST_TIMEOUT_SECONDS $TEST_COMMAND print_logs } run_with_timeout() { internal_run_with_timeout $1 $2 "" } internal_run_with_timeout() { local timeout=$1 local runCommand=$2 local onFailureCommand=$3 // do stuff } print_logs() { echo "Printing Flink logs and killing it:" cat ${FLINK_DIR}/log/* } ``` ## File path: flink-end-to-end-tests/test-scripts/common.sh ## @@ -830,3 +830,30 @@ run_test_with_timeout() { $TEST_COMMAND ) } + +run_with_timeout() { + local timeout="$1" + shift + local pidFile="$1" + shift + local command="$@" + + # invoke command + (eval "$command"; rm $pidFile; ) & + pid=$! + echo $pid > $pidFile + + # invoke timeout guard + ( +sleep $timeout +if [[ -e $pidFile ]]; then Review comment: We should be able to re-use `run_test_with_timeout`. All we basically need to do is move this ``` echo "Printing Flink logs and killing it:" cat ${FLINK_DIR}/log/* ``` into a separate function, and add a new `run_with_timeout` that accepts a timeout, run-command, and on-failure-command. ``` run_test_with_timeout() { local TEST_TIMEOUT_SECONDS=$1 shift local TEST_COMMAND=$@ internal_run_with_timeout $TEST_TIMEOUT_SECONDS "$TEST_COMMAND" print_logs } run_with_timeout() { internal_run_with_timeout $1 "$2" "" } internal_run_with_timeout() { local timeout=$1 local runCommand=$2 local onFailureCommand=$3 // do stuff } print_logs() { echo "Printing Flink logs and killing it:" cat ${FLINK_DIR}/log/* } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai closed pull request #159: [FLINK-19330][core] Move intialization logic to open() instead initializeState
tzulitai closed pull request #159: URL: https://github.com/apache/flink-statefun/pull/159 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai closed pull request #157: [FLINK-19327][k8s] Bump JobManager heap size to 1 GB
tzulitai closed pull request #157: URL: https://github.com/apache/flink-statefun/pull/157 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13228: [FLINK-19026][network] Improve threading model of CheckpointBarrierUnaligner
flinkbot edited a comment on pull request #13228: URL: https://github.com/apache/flink/pull/13228#issuecomment-679099456 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13420: [FLINK-19229][python] Introduce the PythonStreamGroupAggregateOperator for Python UDAF.
flinkbot edited a comment on pull request #13420: URL: https://github.com/apache/flink/pull/13420#issuecomment-694793326 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong edited a comment on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)
xintongsong edited a comment on pull request #13453: URL: https://github.com/apache/flink/pull/13453#issuecomment-696625955 Chime in with a minor comment: > The Serial Version UID for new classes should start at 1. Suggested by the code style guidelines. https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dawidwys edited a comment on pull request #13450: [FLINK-19339] Support unions with logical types in Avro >= 1.9.x
dawidwys edited a comment on pull request #13450: URL: https://github.com/apache/flink/pull/13450#issuecomment-696655915 It is relevant for `AvroSerializer` (which I updated through `AvroFactory#fromSpecific` and added a test for it: `AvroUnionLogicalSerializerTest.java`). I checked all places where we instantiate the `SpecificData`manually and updated those. In the `AvroInputFormat` we use the `SpecificDatumReader` which should just work (it has that logic of acquiring `SpecificData` built in). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13456: Single task add partial flag in buffer
flinkbot commented on pull request #13456: URL: https://github.com/apache/flink/pull/13456#issuecomment-696739793 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kl0u commented on a change in pull request #13423: [FLINK-19269] Make the PushingAsyncDataInput.DataOutput aware of endOfInput
kl0u commented on a change in pull request #13423: URL: https://github.com/apache/flink/pull/13423#discussion_r492589663 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java ## @@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void setKeyContextElement(StreamRecord record) throws Exception { owner.internalSetKeyContextElement(record, stateKeySelector); } + + @Override + public void endInput() throws Exception { + if (owner instanceof BoundedOneInput && inputId == 1) { Review comment: Here can it be any other inputId apart from 1? ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java ## @@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void setKeyContextElement(StreamRecord record) throws Exception { owner.internalSetKeyContextElement(record, stateKeySelector); } + + @Override + public void endInput() throws Exception { + if (owner instanceof BoundedOneInput && inputId == 1) { Review comment: The reason I am asking is because this check did not exist in any of the methods removed by this PR which were handling the input closing logic. In my opinion, although from the code this check may be safe, to be on the safe side it may make sense to remove it so that the logic is the same. But feel free to disagree and leave it as is. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java ## @@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void setKeyContextElement(StreamRecord record) throws Exception { owner.internalSetKeyContextElement(record, stateKeySelector); } + + @Override + public void endInput() throws Exception { + if (owner instanceof BoundedOneInput && inputId == 1) { Review comment: The reason I am asking is because this check did not exist in any of the methods removed by this PR which were handling the input closing logic. This makes me wonder if we are correct on this. In my opinion, although from the code this check may be safe, to be on the safe side it may make sense to remove it so that the logic is the same. But feel free to disagree and leave it as is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai closed pull request #158: [FLINK-19329] FunctionGroupOperator#dispose() might throw NPE during an unclean shutdown.
tzulitai closed pull request #158: URL: https://github.com/apache/flink-statefun/pull/158 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on pull request #13430: [BP-1.11][FLINK-19140] Fix UDTF documentation which uses wrong alias
twalthr commented on pull request #13430: URL: https://github.com/apache/flink/pull/13430#issuecomment-696769329 Merged into 1.11 branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] aljoscha commented on pull request #8207: [FLINK-12250] Rewrite assembleNewPartPath to let it return a new PartPath
aljoscha commented on pull request #8207: URL: https://github.com/apache/flink/pull/8207#issuecomment-696772477 This actually looks like a good fix. I rebased, will run CI and then merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kottmann commented on pull request #13457: [FLINK-8357] Use rolling logs as default
kottmann commented on pull request #13457: URL: https://github.com/apache/flink/pull/13457#issuecomment-697013989 Things to be considered - Any opinions about the file size? What is a good size? - The max number of files could be set - OnStartupTriggeringPolicy can be used to replace the rolling on startup code in the shell scripts This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai commented on pull request #158: [FLINK-19329] FunctionGroupOperator#dispose() might throw NPE during an unclean shutdown.
tzulitai commented on pull request #158: URL: https://github.com/apache/flink-statefun/pull/158#issuecomment-696548606 LGTM, +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai closed pull request #160: [hotfix][dockerfile] Add newline to end of flink-conf
tzulitai closed pull request #160: URL: https://github.com/apache/flink-statefun/pull/160 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StephanEwen commented on a change in pull request #13447: [FLINK-19297][network] Make ResultPartitionWriter record-oriented
StephanEwen commented on a change in pull request #13447: URL: https://github.com/apache/flink/pull/13447#discussion_r492720319 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -109,89 +94,58 @@ } } - protected void emit(T record, int targetChannel) throws IOException, InterruptedException { + protected void emit(T record, int targetSubpartition) throws IOException { checkErroneous(); - serializer.serializeRecord(record); - - // Make sure we don't hold onto the large intermediate serialization buffer for too long - copyFromSerializerToTargetChannel(targetChannel); - } - - /** -* @param targetChannel -* @return true if the intermediate serialization buffer should be pruned -*/ - protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { - // We should reset the initial position of the intermediate serialization buffer before - // copying, so the serialization results can be copied to multiple target buffers. - serializer.reset(); - - boolean pruneTriggered = false; - BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); - SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); - while (result.isFullBuffer()) { - finishBufferBuilder(bufferBuilder); - - // If this was a full record, we are done. Not breaking out of the loop at this point - // will lead to another buffer request before breaking out (that would not be a - // problem per se, but it can lead to stalls in the pipeline). - if (result.isFullRecord()) { - pruneTriggered = true; - emptyCurrentBufferBuilder(targetChannel); - break; - } - - bufferBuilder = requestNewBufferBuilder(targetChannel); - result = serializer.copyToBufferBuilder(bufferBuilder); - } - checkState(!serializer.hasSerializedData(), "All data should be written at once"); + targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition); if (flushAlways) { - flushTargetPartition(targetChannel); + targetPartition.flush(targetSubpartition); } - return pruneTriggered; } public void broadcastEvent(AbstractEvent event) throws IOException { broadcastEvent(event, false); } public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { - try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) { - for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) { - tryFinishCurrentBufferBuilder(targetChannel); - - // Retain the buffer so that it can be recycled by each channel of targetPartition - targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel, isPriorityEvent); - } + targetPartition.broadcastEvent(event, isPriorityEvent); - if (flushAlways) { - flushAll(); - } + if (flushAlways) { + flushAll(); } } - public void flushAll() { - targetPartition.flushAll(); + @VisibleForTesting + public static ByteBuffer serializeRecord( Review comment: It would be really great if this method were not public. Ideally we can remove this completely, because all tests that use this bypass some crucial logic of this class and may result in meaningless tests. This method is used in three places: - The occurrence in `SingleInputGateTest` can be replaced with emitting a record. - The occurrence in `TestPartitionProducer` could be removed by adjusting `TestProducerSource` to produce `ByteBuffer` instead of `BufferConsumer`, which looks like a nice change that might even simplify things. - If the change for `PartitionTestUtils` could in theory be kept, and the visibility of the method be reduced to package-private. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java ## @@ -63,6 +63,22 @@ public BoundedBlockingResultPartition( bufferPoolFactory); } +
[GitHub] [flink] flinkbot commented on pull request #13448: [FLINK-19289][k8s] Remove pods terminated during JM failover.
flinkbot commented on pull request #13448: URL: https://github.com/apache/flink/pull/13448#issuecomment-696493031 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13128: [FLINK-18795][hbase] Support for HBase 2
flinkbot edited a comment on pull request #13128: URL: https://github.com/apache/flink/pull/13128#issuecomment-672766836 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] galenwarren commented on a change in pull request #152: [FLINK-19176] Add pluggable statefun payload serializer
galenwarren commented on a change in pull request #152: URL: https://github.com/apache/flink-statefun/pull/152#discussion_r492357429 ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java ## @@ -61,4 +64,26 @@ private static void validateParentFirstClassloaderPatterns(Configuration configu } return parentFirstClassloaderPatterns; } + + private static void validateCustomPayloadSerializerClassName(Configuration configuration) { + +MessageFactoryType factoryType = +configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER); +String customPayloadSerializerClassName = + configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS); + +if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) { + if (StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) { Review comment: OK will do ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java ## @@ -0,0 +1,43 @@ +package org.apache.flink.statefun.flink.core.message; + +import java.io.Serializable; +import java.util.Objects; + +public class MessageFactoryKey implements Serializable { Review comment: Yes, and it will be included in the next push ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java ## @@ -0,0 +1,43 @@ +package org.apache.flink.statefun.flink.core.message; + +import java.io.Serializable; +import java.util.Objects; + +public class MessageFactoryKey implements Serializable { + private static final long serialVersionUID = 1L; + + private final MessageFactoryType type; + private final String customPayloadSerializerClassName; + + private MessageFactoryKey(MessageFactoryType type, String customPayloadSerializerClassName) { +this.type = type; +this.customPayloadSerializerClassName = customPayloadSerializerClassName; + } + + public static MessageFactoryKey forType( + MessageFactoryType type, String customPayloadSerializerClassName) { +return new MessageFactoryKey(type, customPayloadSerializerClassName); + } + + public MessageFactoryType getType() { +return this.type; + } + + public String getCustomPayloadSerializerClassName() { Review comment: For some reason, before I couldn't comment here, but now I can. Weird glitch. Anyway, I left a top-level comment/question for you on this one, just FYI. ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java ## @@ -0,0 +1,43 @@ +package org.apache.flink.statefun.flink.core.message; + +import java.io.Serializable; +import java.util.Objects; + +public class MessageFactoryKey implements Serializable { + private static final long serialVersionUID = 1L; + + private final MessageFactoryType type; + private final String customPayloadSerializerClassName; + + private MessageFactoryKey(MessageFactoryType type, String customPayloadSerializerClassName) { +this.type = type; Review comment: Yes, will be fixed in next push This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)
zentol commented on pull request #13453: URL: https://github.com/apache/flink/pull/13453#issuecomment-696681873 @xintongsong Do you know by chance how to setup IntelliJ to use 1 as the default UID? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rmetzger merged pull request #13409: [FLINK-17910][e2e] Fix debug log output to investigate rare test failure
rmetzger merged pull request #13409: URL: https://github.com/apache/flink/pull/13409 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a change in pull request #13040: [FLINK-17073] [checkpointing] checkpointing backpressure if there are too many checkpoints to clean
echauchot commented on a change in pull request #13040: URL: https://github.com/apache/flink/pull/13040#discussion_r492597386 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.StateUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Delegate class responsible for checkpoints cleaning and counting the number of checkpoints yet + * to clean. + */ +public class CheckpointsCleaner implements Serializable{ + private final AtomicInteger numberOfCheckpointsToClean; + private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class); + + public CheckpointsCleaner() { + this.numberOfCheckpointsToClean = new AtomicInteger(0); + } + + int getNumberOfCheckpointsToClean() { + return numberOfCheckpointsToClean.get(); + } + + public void cleanCheckpoint(Runnable cleanAction, Runnable postCleanAction, Executor executor) { + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + try { + cleanAction.run(); + } finally { + numberOfCheckpointsToClean.decrementAndGet(); + postCleanAction.run(); + } + }); + } + + public void cleanStates(Runnable postCleanAction, Map operatorStates, PendingCheckpoint pendingCheckpoint, CheckpointStorageLocation targetLocation, Executor executor){ + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + // discard the private states. + // unregistered shared states are still considered private at this point. + try { + StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); + targetLocation.disposeOnFailure(); Review comment: @rkhachatryan can you confirm that this is indeed the refactoring that you want before I code it ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] sjwiesman commented on pull request #159: [FLINK-19330][core] Move intialization logic to open() instead initializeState
sjwiesman commented on pull request #159: URL: https://github.com/apache/flink-statefun/pull/159#issuecomment-696412736 Out of scope for this PR but based on our offline conversation today it seems like there should be a remote e2e test with TM failures This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr closed pull request #13425: [FLINK-19272][table-common] Add metadata interfaces for FLIP-107
twalthr closed pull request #13425: URL: https://github.com/apache/flink/pull/13425 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13447: [FLINK-19297][network] Make ResultPartitionWriter record-oriented
flinkbot commented on pull request #13447: URL: https://github.com/apache/flink/pull/13447#issuecomment-696485547 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] igalshilman commented on a change in pull request #152: [FLINK-19176] Add pluggable statefun payload serializer
igalshilman commented on a change in pull request #152: URL: https://github.com/apache/flink-statefun/pull/152#discussion_r492318196 ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java ## @@ -0,0 +1,43 @@ +package org.apache.flink.statefun.flink.core.message; + +import java.io.Serializable; +import java.util.Objects; + +public class MessageFactoryKey implements Serializable { Review comment: Can this be final? ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java ## @@ -0,0 +1,43 @@ +package org.apache.flink.statefun.flink.core.message; + +import java.io.Serializable; +import java.util.Objects; + +public class MessageFactoryKey implements Serializable { + private static final long serialVersionUID = 1L; + + private final MessageFactoryType type; + private final String customPayloadSerializerClassName; + + private MessageFactoryKey(MessageFactoryType type, String customPayloadSerializerClassName) { +this.type = type; Review comment: Can this be: `this.type = Objects.requireNonNull(type);` ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java ## @@ -61,4 +64,26 @@ private static void validateParentFirstClassloaderPatterns(Configuration configu } return parentFirstClassloaderPatterns; } + + private static void validateCustomPayloadSerializerClassName(Configuration configuration) { + +MessageFactoryType factoryType = +configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER); +String customPayloadSerializerClassName = + configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS); + +if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) { + if (StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) { Review comment: I think that you are right, let's leave that as it is ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java ## @@ -0,0 +1,43 @@ +package org.apache.flink.statefun.flink.core.message; + +import java.io.Serializable; +import java.util.Objects; + +public class MessageFactoryKey implements Serializable { + private static final long serialVersionUID = 1L; + + private final MessageFactoryType type; + private final String customPayloadSerializerClassName; + + private MessageFactoryKey(MessageFactoryType type, String customPayloadSerializerClassName) { +this.type = type; +this.customPayloadSerializerClassName = customPayloadSerializerClassName; + } + + public static MessageFactoryKey forType( + MessageFactoryType type, String customPayloadSerializerClassName) { +return new MessageFactoryKey(type, customPayloadSerializerClassName); + } + + public MessageFactoryType getType() { +return this.type; + } + + public String getCustomPayloadSerializerClassName() { Review comment: That is a good point! In that case, what do you think about making this method return an `Optional`, and version 2 would write an extra byte to indicate if the optional is present or not. Something along the lines: ``` if (className.isPresent()) { out.writeBooolean(true); out.writeUtf8String(className.get()); } else { out.writeBooolean(false); } ``` ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java ## @@ -0,0 +1,43 @@ +package org.apache.flink.statefun.flink.core.message; + +import java.io.Serializable; +import java.util.Objects; + +public class MessageFactoryKey implements Serializable { + private static final long serialVersionUID = 1L; + + private final MessageFactoryType type; + private final String customPayloadSerializerClassName; + + private MessageFactoryKey(MessageFactoryType type, String customPayloadSerializerClassName) { +this.type = type; +this.customPayloadSerializerClassName = customPayloadSerializerClassName; + } + + public static MessageFactoryKey forType( + MessageFactoryType type, String customPayloadSerializerClassName) { +return new MessageFactoryKey(type, customPayloadSerializerClassName); + } + + public MessageFactoryType getType() { +return this.type; + } + + public String getCustomPayloadSerializerClassName() { Review comment: > > So the serialized format would already seem to be what you've requested! But if you'd prefer it get written without using `StringUtils.writeNullableString`, I'm happy to make that change. Oh I see. My original thinking here was to prevent a future NPE by someone who would expect (mistakenly) that
[GitHub] [flink] wuchong commented on pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join
wuchong commented on pull request #13289: URL: https://github.com/apache/flink/pull/13289#issuecomment-696662161 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dawidwys commented on pull request #13423: [FLINK-19269] Make the PushingAsyncDataInput.DataOutput aware of endOfInput
dawidwys commented on pull request #13423: URL: https://github.com/apache/flink/pull/13423#issuecomment-696628108 Hi @pnowojski could you take a look at the changes, as it touches some of the streaming runtime parts? Or maybe you could ask somebody else that works on the streaming runtime part? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong merged pull request #13442: [FLINK-19321][Table SQL / Runtime]CollectSinkFunction does not define serialVersionUID
wuchong merged pull request #13442: URL: https://github.com/apache/flink/pull/13442 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a change in pull request #13438: [FLINK-19014][e2e] Increase startup timeout
zentol commented on a change in pull request #13438: URL: https://github.com/apache/flink/pull/13438#discussion_r492515807 ## File path: flink-dist/src/main/flink-bin/bin/flink-daemon.sh ## @@ -88,15 +88,6 @@ out="${FLINK_LOG_PREFIX}.out" log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml") -JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q') Review comment: Why is this being removed? The same code exists in flink-console.sh. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)
flinkbot commented on pull request #13453: URL: https://github.com/apache/flink/pull/13453#issuecomment-696608065 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on pull request #10354: [FLINK-14729][connectors] Multi-topics consuming from KafkaTableSource
twalthr commented on pull request #10354: URL: https://github.com/apache/flink/pull/10354#issuecomment-696785659 @fangpengcheng95 Sorry, for replying so late to this PR. I think the changes make sense. However, we have a new table source architecture after FLIP-95. Do you think you have time to update this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #13332: [FLINK-19128][sql-client] Remove the runtime execution configuration in sql-client-defaults.yaml
wuchong commented on pull request #13332: URL: https://github.com/apache/flink/pull/13332#issuecomment-696672677 Thanks @twalthr , will merge this then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] azagrebin commented on a change in pull request #13316: [FLINK-14422][runtime] Expose network memory usage to TaskManagerDetailsHandler's endpoint
azagrebin commented on a change in pull request #13316: URL: https://github.com/apache/flink/pull/13316#discussion_r492749100 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java ## @@ -206,9 +301,9 @@ public int hashCode() { @JsonCreator public GarbageCollectorInfo( - @JsonProperty(FIELD_NAME_NAME) String name, - @JsonProperty(FIELD_NAME_COUNT) long count, - @JsonProperty(FIELD_NAME_TIME) long time) { + @JsonProperty(FIELD_NAME_NAME) String name, Review comment: formatting is off ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.metrics.dump.MetricDump; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerFileMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo; +import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Tests the {@link TaskManagerDetailsHandler} implementation. + */ +public class TaskManagerDetailsHandlerTest extends TestLogger { + + private static final Random random = new Random(); + + private ResourceID taskManagerId = ResourceID.generate(); + + private TestingResourceManagerGateway resourceManagerGateway; + private TaskManagerDetailsHandler testInstance; + + private HandlerRequest handlerRequest; + + public void initializeMetricStore(MetricStore metricStore) { + QueryScopeInfo.TaskManagerQueryScopeInfo tmScope = new QueryScopeInfo.TaskManagerQueryScopeInfo(taskManagerId.toString(), "Status"); Review comment: ```suggestion private static void initializeMetricStore(MetricStore metricStore) { QueryScopeInfo.TaskManagerQueryScopeInfo tmScope = new QueryScopeInfo.TaskManagerQueryScopeInfo(TASK_MANAGER_ID.toString(), "Status"); ``` ## File path:
[GitHub] [flink] rkhachatryan commented on a change in pull request #13040: [FLINK-17073] [checkpointing] checkpointing backpressure if there are too many checkpoints to clean
rkhachatryan commented on a change in pull request #13040: URL: https://github.com/apache/flink/pull/13040#discussion_r492606259 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.StateUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Delegate class responsible for checkpoints cleaning and counting the number of checkpoints yet + * to clean. + */ +public class CheckpointsCleaner implements Serializable{ + private final AtomicInteger numberOfCheckpointsToClean; + private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class); + + public CheckpointsCleaner() { + this.numberOfCheckpointsToClean = new AtomicInteger(0); + } + + int getNumberOfCheckpointsToClean() { + return numberOfCheckpointsToClean.get(); + } + + public void cleanCheckpoint(Runnable cleanAction, Runnable postCleanAction, Executor executor) { + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + try { + cleanAction.run(); + } finally { + numberOfCheckpointsToClean.decrementAndGet(); + postCleanAction.run(); + } + }); + } + + public void cleanStates(Runnable postCleanAction, Map operatorStates, PendingCheckpoint pendingCheckpoint, CheckpointStorageLocation targetLocation, Executor executor){ + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + // discard the private states. + // unregistered shared states are still considered private at this point. + try { + StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); + targetLocation.disposeOnFailure(); Review comment: Sorry, my reply got lost for some reason. I think that `operatorStates.clear();` should remain in `PendingCheckpoint` too lest break encapsulation. So the original Runnable can be submitted as `cleanAction` to `CheckpointCleaner.cleanCheckpoint`.: ``` try { StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); targetLocation.disposeOnFailure(); } catch (Throwable t) { LOG.warn("Could not properly dispose the private states in the pending checkpoint {} of job {}.", checkpointId, jobId, t); } finally { operatorStates.clear(); } ``` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import
[GitHub] [flink] lirui-apache commented on a change in pull request #13434: [FLINK-19292][hive] HiveCatalog should support specifying Hadoop conf dir with configuration
lirui-apache commented on a change in pull request #13434: URL: https://github.com/apache/flink/pull/13434#discussion_r492720329 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java ## @@ -57,10 +59,10 @@ public ExpectedException expectedException = ExpectedException.none(); @Test - public void test() { + public void test() throws IOException { Review comment: Besides, we need to add a test case for the hadoop conf dir configuration. ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java ## @@ -57,10 +59,10 @@ public ExpectedException expectedException = ExpectedException.none(); @Test - public void test() { + public void test() throws IOException { Review comment: Create a new test case for this ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -199,7 +207,7 @@ private static HiveConf createHiveConf(@Nullable String hiveConfDir) { Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration()); // Add mapred-site.xml. We need to read configurations like compression codec. - for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new org.apache.flink.configuration.Configuration())) { + for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new org.apache.flink.configuration.Configuration(), hadoopConfDir)) { Review comment: Let's not modify `HadoopUtils`. Instead, if `hadoopConfDir` is not null, set `ConfigConstants.PATH_HADOOP_CONFIG` in the `Configuration` instance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13445: [FLINK-19331][state-processor-api] Native resource leak when working with RocksDB
flinkbot edited a comment on pull request #13445: URL: https://github.com/apache/flink/pull/13445#issuecomment-696389960 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai commented on pull request #157: [FLINK-19327][k8s] Bump JobManager heap size to 1 GB
tzulitai commented on pull request #157: URL: https://github.com/apache/flink-statefun/pull/157#issuecomment-696560584 Merging ... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] carp84 commented on a change in pull request #13405: [FLINK-19270] Extract an inteface from AbstractKeyedStateBackend
carp84 commented on a change in pull request #13405: URL: https://github.com/apache/flink/pull/13405#discussion_r492641148 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -282,8 +283,8 @@ public void setCurrentKey(Object key) { if (keyedStateBackend != null) { try { // need to work around type restrictions - @SuppressWarnings("unchecked,rawtypes") - AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend; + @SuppressWarnings("rawtypes") + CheckpointableKeyedStateBackend rawBackend = keyedStateBackend; rawBackend.setCurrentKey(key); Review comment: The suggested change looks ok in my local IntelliJ, could you double check? Thanks. :-) ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -282,8 +283,8 @@ public void setCurrentKey(Object key) { if (keyedStateBackend != null) { try { // need to work around type restrictions - @SuppressWarnings("unchecked,rawtypes") - AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend; + @SuppressWarnings("rawtypes") + CheckpointableKeyedStateBackend rawBackend = keyedStateBackend; rawBackend.setCurrentKey(key); Review comment: True, but that's why we need to suppress the "unchecked" warning, rather than "rawtypes"? (In my local env it will also show the warning if I remove the "unchecked" suppression from the method signature) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13450: [FLINK-19339] Support unions with logical types in Avro >= 1.9.x
flinkbot edited a comment on pull request #13450: URL: https://github.com/apache/flink/pull/13450#issuecomment-696552930 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13452: [FLINK-19273][sql-parser] Support METADATA syntax in SQL parser
flinkbot edited a comment on pull request #13452: URL: https://github.com/apache/flink/pull/13452#issuecomment-696611770 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)
XComp commented on a change in pull request #13453: URL: https://github.com/apache/flink/pull/13453#discussion_r492673612 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/slots/ResourceRequirement.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.slots; + +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Represents the number of required resource for a specific {@link ResourceProfile}. Review comment: Being picky here since it's the initial commit. 8) ```suggestion * Represents the number of required resources for a specific {@link ResourceProfile}. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rmetzger commented on a change in pull request #13438: [FLINK-19014][e2e] Increase startup timeout
rmetzger commented on a change in pull request #13438: URL: https://github.com/apache/flink/pull/13438#discussion_r492646461 ## File path: flink-dist/src/main/flink-bin/bin/flink-daemon.sh ## @@ -88,15 +88,6 @@ out="${FLINK_LOG_PREFIX}.out" log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml") -JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q') Review comment: Damn, I need to work on my `git grep` skills I removed it because the java version is used to check if the version is less than 1.8. Since it can never be less than 1.8, we don't need this check (anymore). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13455: [FLINK-19361][hive] Create a synchronized metastore client to talk to…
flinkbot commented on pull request #13455: URL: https://github.com/apache/flink/pull/13455#issuecomment-696719238 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13457: [FLINK-8357] Use rolling logs as default
flinkbot edited a comment on pull request #13457: URL: https://github.com/apache/flink/pull/13457#issuecomment-697012704 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join
flinkbot edited a comment on pull request #13289: URL: https://github.com/apache/flink/pull/13289#issuecomment-683894616 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on pull request #13128: [FLINK-18795][hbase] Support for HBase 2
leonardBang commented on pull request #13128: URL: https://github.com/apache/flink/pull/13128#issuecomment-696498920 One more tip, that's great if we could add e2e test for `hbase 2` just like kafka support multiple versions e2e test, we can keep `hbase 2` test once we decided to deprecate the `hbase 1` https://github.com/apache/flink/blob/607919c6ea6983ae5ad3f82d63b7d6455c73d225/flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/resources/hbase_e2e.sql#L22 https://github.com/apache/flink/blob/607919c6ea6983ae5ad3f82d63b7d6455c73d225/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/resources/kafka_e2e.sql#L23 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13446: [FLINK-18779]Support the SupportsFilterPushDown for LookupTableSource
flinkbot commented on pull request #13446: URL: https://github.com/apache/flink/pull/13446#issuecomment-696484481 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu merged pull request #13420: [FLINK-19229][python] Introduce the PythonStreamGroupAggregateOperator for Python UDAF.
dianfu merged pull request #13420: URL: https://github.com/apache/flink/pull/13420 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangyang0918 commented on pull request #13440: [FLINK-18725][e2e] Use ClusterIP instead of NodePort and remove query port in internal jobmanager service
wangyang0918 commented on pull request #13440: URL: https://github.com/apache/flink/pull/13440#issuecomment-696452209 cc @rmetzger, could you please have a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu commented on a change in pull request #13451: [FLINK-19333][python] Introduce BatchArrowPythonOverWindowAggregateFunctionOperator
dianfu commented on a change in pull request #13451: URL: https://github.com/apache/flink/pull/13451#discussion_r492600107 ## File path: flink-python/pyflink/proto/flink-fn-execution.proto ## @@ -44,12 +44,33 @@ message UserDefinedFunction { // 2. The result of another user-defined function // 3. The constant value of the column repeated Input inputs = 2; + + // Used in pandas batch over window aggregation + int32 windowIndex = 3; } // A list of user-defined functions to be executed in a batch. message UserDefinedFunctions { repeated UserDefinedFunction udfs = 1; bool metric_enabled = 2; + repeated Window windows = 3; +} + +// Used to describe the info of over window in pandas batch over window aggregation +message Window { Review comment: What about renamed as OverWindow? ## File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.fnexecution.v1.FlinkFnApi; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.python.PythonFunctionInfo; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.RowType; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Batch Arrow Python {@link AggregateFunction} Operator for Over Window Aggregation. + */ +@Internal +public class BatchArrowPythonOverWindowAggregateFunctionOperator + extends AbstractBatchArrowPythonAggregateFunctionOperator { + + private static final long serialVersionUID = 1L; + + private static final String SCHEMA_OVER_WINDOW_ARROW_CODER_URN = "flink:coder:schema:over_window_arrow:v1"; + + private static final String PANDAS_BATCH_OVER_WINDOW_AGG_FUNCTION_URN = "flink:transform:batch_over_window_aggregate_function:arrow:v1"; + + /** +* Used to serialize the boundary of range window. +*/ + private static final IntSerializer windowBoundarySerializer = IntSerializer.INSTANCE; + + /** +* Window lower boundary. e.g. Long.MIN_VALUE means unbounded preceding. +*/ + private final long[] lowerBoundary; + + /** +* Window upper boundary. e.g. Long.MAX_VALUE means unbounded following. +*/ + private final long[] upperBoundary; + + /** +* Whether the specified position window is a range window. +*/ + private final boolean[] isRangeWindows; + + /** +* The window index of the specified aggregate function belongs to. +*/ + private final int[] aggWindowIndex; + + /** +* The row time index of the input data. +*/ + private final int inputTimeFieldIndex; + + /** +* The order of row time. True for ascending. +*/ + private final boolean order; + + /** +* The type serializer for the forwarded fields. +*/ + private transient RowDataSerializer forwardedInputSerializer; + + /** +* Stores the start position of the last key data in forwardedInputQueue. +*/ + private transient int lastKeyDataStartPos; + + /** +* Reusable OutputStream used to holding the window boundary with input elements. +*/ + private transient ByteArrayOutputStreamWithPos windowBoundaryWithDataBaos; + + /** +* OutputStream Wrapper. +*/ + private transient DataOutputViewStreamWrapper
[GitHub] [flink] dawidwys commented on a change in pull request #13423: [FLINK-19269] Make the PushingAsyncDataInput.DataOutput aware of endOfInput
dawidwys commented on a change in pull request #13423: URL: https://github.com/apache/flink/pull/13423#discussion_r492618028 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java ## @@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void setKeyContextElement(StreamRecord record) throws Exception { owner.internalSetKeyContextElement(record, stateKeySelector); } + + @Override + public void endInput() throws Exception { + if (owner instanceof BoundedOneInput && inputId == 1) { Review comment: It shouldn't. Nevertheless I think it is not safe to just assume so. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ## @@ -107,7 +106,11 @@ public void run(final Object lockingObject, // in theory, the subclasses of StreamSource may implement the BoundedOneInput interface, // so we still need the following call to end the input synchronized (lockingObject) { - operatorChain.endMainOperatorInput(1); + if (this instanceof BoundedOneInput) { + ((BoundedOneInput) this).endInput(); + } else if (this instanceof BoundedMultiInput) { + ((BoundedMultiInput) this).endInput(1); Review comment: Not sure if I understand the comment. Does a StreamSource even have inputs? Of course I might be missing something here, but isn't the input id, the id of an input that has finished? Honestly, I find this part of code quite shady, but I think the logic pre and post this change is the same. Again I might be wrong. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ## @@ -107,7 +106,11 @@ public void run(final Object lockingObject, // in theory, the subclasses of StreamSource may implement the BoundedOneInput interface, // so we still need the following call to end the input synchronized (lockingObject) { - operatorChain.endMainOperatorInput(1); + if (this instanceof BoundedOneInput) { + ((BoundedOneInput) this).endInput(); + } else if (this instanceof BoundedMultiInput) { + ((BoundedMultiInput) this).endInput(1); Review comment: Sorry, I am not sure if I understand the comment. Does a StreamSource even have inputs? Of course I might be missing something here, but isn't the input id, the id of an input that has finished? Honestly, I find this part of code quite shady, but I think the logic pre and post this change is the same. Again I might be wrong. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ## @@ -107,7 +106,11 @@ public void run(final Object lockingObject, // in theory, the subclasses of StreamSource may implement the BoundedOneInput interface, // so we still need the following call to end the input synchronized (lockingObject) { - operatorChain.endMainOperatorInput(1); + if (this instanceof BoundedOneInput) { + ((BoundedOneInput) this).endInput(); + } else if (this instanceof BoundedMultiInput) { + ((BoundedMultiInput) this).endInput(1); Review comment: Sorry, I am not sure if I understand the comment. Does a StreamSource even have inputs? Of course I might be missing something here, but isn't the input id, the id of an input that has finished? Honestly, I find this part of code quite shady (as there are actually no inputs that can finish, imo), but I think the logic pre and post this change is the same. Again I might be wrong. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ## @@ -107,7 +106,11 @@ public void run(final Object lockingObject, // in theory, the subclasses of StreamSource may implement the BoundedOneInput interface, // so we still need the following call to end the input
[GitHub] [flink] aljoscha commented on a change in pull request #13452: [FLINK-19273][sql-parser] Support METADATA syntax in SQL parser
aljoscha commented on a change in pull request #13452: URL: https://github.com/apache/flink/pull/13452#discussion_r492584757 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java ## @@ -115,9 +115,10 @@ * ALL - a shortcut to change the default merging strategy if none provided * CONSTRAINTS - constraints such as primary and unique keys * GENERATED - computed columns +* METADATA - metadata columns * WATERMARKS - watermark declarations * PARTITIONS - partition of the tables -* OPTIONS - connector options that decribed connector and format properties +* OPTIONS - connector options that described connector and format properties Review comment: ```suggestion * OPTIONS - connector options that describe connector and format properties ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu closed pull request #13230: [FLINK-18950][python][docs] Add documentation for Operations in Python DataStream API.
dianfu closed pull request #13230: URL: https://github.com/apache/flink/pull/13230 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #13289: [FLINK-18548][table-planner] support flexible syntax for Temporal table join
wuchong commented on a change in pull request #13289: URL: https://github.com/apache/flink/pull/13289#discussion_r492517117 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala ## @@ -163,7 +163,35 @@ class FlinkPlannerImpl( sqlValidator.getCatalogReader.unwrap(classOf[CalciteCatalogReader]), cluster, convertletTable, -sqlToRelConverterConfig) +sqlToRelConverterConfig) { +// override convertFrom() to support flexible Temporal Table Syntax, +// this can be revert once FLINK-16579(Upgrade Calcite version to 1.23) resolved. +val relBuilder = config.getRelBuilderFactory.create(cluster, null) + +override def convertFrom(bb: SqlToRelConverter#Blackboard, from: SqlNode): Unit = { Review comment: Create a JIRA issue to remove this overriding once we bump up Calcite version. And add comment above this method with the JIRA id. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13445: [FLINK-19331][state-processor-api] Native resource leak when working with RocksDB
flinkbot commented on pull request #13445: URL: https://github.com/apache/flink/pull/13445#issuecomment-696385131 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaoyunhaii commented on a change in pull request #13357: [FLINK-19165] Refactor the UnilateralSortMerger
gaoyunhaii commented on a change in pull request #13357: URL: https://github.com/apache/flink/pull/13357#discussion_r492529818 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CircularQueues.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Collection of queues that are used for the communication between the threads. + */ +final class CircularQueues implements StageRunner.StageMessageDispatcher { + private final BlockingQueue> empty; + private final BlockingQueue> sort; + private final BlockingQueue> spill; + + private boolean isFinished = false; Review comment: Very sorry that there is one issue missing, but it just come to me that we might need to make the `isFinished` to be `volatile`: the `CircularQueues#close()` might be called from `SpilledThread` or `SortThread`, and we might need to read this variable in the task main thread. Since `ExternalSorter#close()` guarantees that there should be no concurrent writer, therefore where we need only `volatile` instead of atomic variables here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13449: [FLINK-19282][table sql/planner]Supports watermark push down with Wat…
flinkbot commented on pull request #13449: URL: https://github.com/apache/flink/pull/13449#issuecomment-696505414 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StephanEwen commented on pull request #13412: [FLINK-19069] execute finalizeOnMaster with io executor to avoid hea…
StephanEwen commented on pull request #13412: URL: https://github.com/apache/flink/pull/13412#issuecomment-696662169 Just leaving a quick comment here that the new sinks that @aljoscha and @guoweiM are working on will be different and not affected by this. So this here is more of a temporary fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu closed pull request #13421: [FLINK-19186][python] Add Python building blocks to make sure the basic functionality of Pandas Batch Group Aggregation could work
dianfu closed pull request #13421: URL: https://github.com/apache/flink/pull/13421 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13216: [FLINK-18999][table-planner-blink][hive] Temporary generic table does…
flinkbot edited a comment on pull request #13216: URL: https://github.com/apache/flink/pull/13216#issuecomment-678268420 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dawidwys commented on pull request #13405: [FLINK-19270] Extract an inteface from AbstractKeyedStateBackend
dawidwys commented on pull request #13405: URL: https://github.com/apache/flink/pull/13405#issuecomment-696625513 Could you have another look @carp84 @aljoscha ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] klion26 commented on a change in pull request #13003: [FLINK-18737][docs]translate jdbc connector
klion26 commented on a change in pull request #13003: URL: https://github.com/apache/flink/pull/13003#discussion_r492695512 ## File path: docs/dev/connectors/jdbc.zh.md ## @@ -38,12 +38,12 @@ To use it, add the following dependency to your project (along with your JDBC-dr {% endhighlight %} -Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/project-configuration.html). +注意连接器目前还 __不是__ 二进制发行版的一部分,如何在集群中运行请参考 [这里]({% link dev/project-configuration.zh.md %})。 Review comment: `注意连接器` -> `注意该连接器` 会好一些吗? 现在的描述读起来有一点点拗口 ## File path: docs/dev/connectors/jdbc.zh.md ## @@ -64,4 +66,4 @@ env env.execute(); {% endhighlight %} -Please refer to the [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) for more details. +更多细节请查看 [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) 。 Review comment: 或许你可以试试 `({% link /api/java/org/apache/flink/connector/jdbc/JdbcSink.html %})` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaoyunhaii commented on a change in pull request #13377: [FLINK-18592] bugfix for StreamingFileSink
gaoyunhaii commented on a change in pull request #13377: URL: https://github.com/apache/flink/pull/13377#discussion_r492593704 ## File path: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java ## @@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path p } isClosed = dfs.isFileClosed(path); } + // [FLINK-18592] recover lease after the lease timeout passed but file was still not closed + if(!isClosed && !deadline.hasTimeLeft()){ + recoverLease(path, dfs); Review comment: I think we might merge this process with the original lease recovering logic. ## File path: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java ## @@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path p } isClosed = dfs.isFileClosed(path); } + // [FLINK-18592] recover lease after the lease timeout passed but file was still not closed + if(!isClosed && !deadline.hasTimeLeft()){ + recoverLease(path, dfs); + } return isClosed; } + + + /* +* Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts the lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery has +* succeeded or the file is closed. +* +* But, we have to be careful. Each time we call recoverLease, it starts the recover lease process over from the beginning. We could put ourselves in a situation +* where we are doing nothing but starting a recovery, interrupting it to start again, and so on. +* +* The namenode will try to recover the lease on the file's primary node. If all is well, it should return near immediately. +* But, as is common, it is the very primary node that has crashed and so the namenode will be stuck waiting on a socket timeout before it will ask another datanode to start the recovery. +* It does not help if we call recoverLease in the meantime and in particular, subsequent to the socket timeout, a recoverLease invocation will cause us to start over from square one +* (possibly waiting on socket timeout against primary node). +* So, in the below, we do the following: +* 1. Call recoverLease. +* 2. If it returns true, break. +* 3. If it returns false, wait a few seconds and then call it again. +* 4. If it returns true, break. +* 5. If it returns false, wait for what we think the datanode socket timeout is (configurable) and then try again. +* 6. If it returns true, break. +* 7. If it returns false, repeat starting at step 5. above. If HDFS-4525 is available, call it every second and we might be able to exit early. +*/ + private static boolean recoverLease(Path path, DistributedFileSystem dfs) throws IOException { + LOG.info("Recover lease on dfs file " + path); + long startWaiting = System.currentTimeMillis(); + // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS + // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves + // beyond that limit 'to be safe'. + //Configuration conf = dfs.getConf(); + long recoveryTimeout = HdfsConstants.LEASE_HARDLIMIT_PERIOD / 4; + long recoveryTargetTimeout = recoveryTimeout + startWaiting; + // This setting should be a little bit above what the cluster dfs heartbeat is set to. + long firstPause = 4000L; + long pause = 1000L; + // This should be set to how long it'll take for us to timeout against primary datanode if it + // is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the + // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this + // timeout, then further recovery will take liner backoff with this base, to avoid endless + // preemptions when this value is not properly configured. + long subsequentPauseBase = HdfsConstants.LEASE_SOFTLIMIT_PERIOD; + + Method isFileClosedMeth = null; + // whether we need to look for isFileClosed method + boolean findIsFileClosedMeth = true; + boolean recovered = false; + // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. + for
[GitHub] [flink-statefun] tzulitai commented on pull request #159: [FLINK-19330][core] Move intialization logic to open() instead initializeState
tzulitai commented on pull request #159: URL: https://github.com/apache/flink-statefun/pull/159#issuecomment-696546718 I think we can easily combine the exactly-once and remote module E2E tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong merged pull request #13426: [FLINK-19244] CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
wuchong merged pull request #13426: URL: https://github.com/apache/flink/pull/13426 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wangxlong commented on pull request #11125: [FLINK-16147][Flink-Table-Common] WatermarkSepc#toString contain watermarkExprOutputType field
wangxlong commented on pull request #11125: URL: https://github.com/apache/flink/pull/11125#issuecomment-697035565 @twalthr Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13416: [FLINK-19179] Extend managed memory fraction calculation for various use cases.
flinkbot edited a comment on pull request #13416: URL: https://github.com/apache/flink/pull/13416#issuecomment-694678006 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StephanEwen commented on pull request #13450: [FLINK-19339] Support unions with logical types in Avro >= 1.9.x
StephanEwen commented on pull request #13450: URL: https://github.com/apache/flink/pull/13450#issuecomment-696649675 Looks good to me. Is this also relevant for other Avro parts, like AvroInputFormat, AvroSerializer, ... ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] twalthr commented on pull request #11125: [FLINK-16147][Flink-Table-Common] WatermarkSepc#toString contain watermarkExprOutputType field
twalthr commented on pull request #11125: URL: https://github.com/apache/flink/pull/11125#issuecomment-696793203 I will merge this for now. But we might rework this part soon again when touching `TableSchema`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13448: [FLINK-19289][k8s] Remove pods terminated during JM failover.
flinkbot edited a comment on pull request #13448: URL: https://github.com/apache/flink/pull/13448#issuecomment-696496421 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-playgrounds] alpinegizmo commented on a change in pull request #16: [FLINK-19145][walkthroughs] Add PyFlink-walkthrough to Flink playground.
alpinegizmo commented on a change in pull request #16: URL: https://github.com/apache/flink-playgrounds/pull/16#discussion_r492952901 ## File path: pyflink-walkthrough/README.md ## @@ -102,6 +106,24 @@ $ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/pa 3. Stop the PyFlink job: -Visit the Flink Web UI at [http://localhost:8081/#/overview](http://localhost:8081/#/overview) , select the job and click `Cancle` on the upper right side. +Visit the Flink Web UI at [http://localhost:8081/#/overview](http://localhost:8081/#/overview) , select the job and click `Cancel` on the upper right side. ![image](pic/cancel.png) + +## Extension + +You are able to edit the [payment_msg_processing.py](payment_msg_proccessing.py) or create new PyFlink +projects to perform more complex processing logic locally on your operating system under the `pyflink-walkthrough` +directory since it is mounted on the `jobmanager` docker container. Such as: +* Creating a new Kafka source table; +* Creating a new index for the Elasticsearch sink; +* Calculating the amount of transactions grouped by a 1 minute tumble window and payPlatforms. + +After the modification, you can submit the new job by executing the same command mentioned at +[Running the PyFlink Job](#running-the-pyflink-job) +```shell script +$ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/payment_msg_proccessing.py -d +``` + +Furthermore, you can also [create new kibana dashboards](https://www.elastic.co/guide/en/kibana/7.8/dashboard-create-new-dashboard.html) +to visualize more charts of various dimension based on the persistent indexes in Elasticsearch. Review comment: ```suggestion ## Further Explorations If you would like to explore this example more deeply, you can edit [payment_msg_processing.py](payment_msg_proccessing.py) or create new PyFlink projects that perform more complex processing. You can do this locally under the `pyflink-walkthrough` directory, since it is mounted on the `jobmanager` docker container. Ideas: * Add your own Kafka source table; * Create a new index for the Elasticsearch sink; * Count the number of transactions, grouped by a 1 minute tumbling windows, and by payPlatform. After making a modification, you can submit the new job by executing the same command mentioned at [Running the PyFlink Job](#running-the-pyflink-job) ```bash $ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/payment_msg_proccessing.py -d ``` Furthermore, you can also [create new kibana dashboards](https://www.elastic.co/guide/en/kibana/7.8/dashboard-create-new-dashboard.html) that visualize other aspects of the data in the Elasticsearch. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #13454: [FLINK-19304][coordination] Add FLIP-138 feature toggle
flinkbot commented on pull request #13454: URL: https://github.com/apache/flink/pull/13454#issuecomment-696661067 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on pull request #13448: [FLINK-19289][k8s] Remove pods terminated during JM failover.
xintongsong commented on pull request #13448: URL: https://github.com/apache/flink/pull/13448#issuecomment-696541373 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sjwiesman commented on pull request #13445: [FLINK-19331][state-processor-api] Native resource leak when working with RocksDB
sjwiesman commented on pull request #13445: URL: https://github.com/apache/flink/pull/13445#issuecomment-696707924 Thanks for pointing that out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #13453: [FLINK-19311][coordination] Add ResourceRequirement(s)
flinkbot edited a comment on pull request #13453: URL: https://github.com/apache/flink/pull/13453#issuecomment-696623367 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org