[GitHub] [flink] flinkbot commented on pull request #22702: [FLINK-32159] AbstractColumnReader throws NPE
flinkbot commented on PR #22702: URL: https://github.com/apache/flink/pull/22702#issuecomment-1574604531 ## CI report: * 840fae1396f01bc55a231f2dacd9597cd7e31c25 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32159) Hudi Source throws NPE
[ https://issues.apache.org/jira/browse/FLINK-32159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32159: --- Labels: pull-request-available (was: ) > Hudi Source throws NPE > -- > > Key: FLINK-32159 > URL: https://issues.apache.org/jira/browse/FLINK-32159 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.15.0, 1.16.0, 1.17.0, 1.18.0 >Reporter: Bo Cui >Priority: Major > Labels: pull-request-available > Attachments: image-2023-05-23-14-45-29-151.png > > > spark/hive write hudi, and flink read hudi and job failed. because > !image-2023-05-23-14-45-29-151.png! > > The null judgment logic should be added to AbstractColumnReader#readToVector > https://github.com/apache/flink/blob/119b8c584dc865ee8a40a5c6410dddf8b36bac5a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java#LL155C19-L155C20 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] cuibo01 opened a new pull request, #22702: [FLINK-32159] AbstractColumnReader throws NPE
cuibo01 opened a new pull request, #22702: URL: https://github.com/apache/flink/pull/22702 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32243) Bump okhttp version to 4.11.0
[ https://issues.apache.org/jira/browse/FLINK-32243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32243: --- Labels: pull-request-available (was: ) > Bump okhttp version to 4.11.0 > - > > Key: FLINK-32243 > URL: https://issues.apache.org/jira/browse/FLINK-32243 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: ConradJam >Priority: Major > Labels: pull-request-available > > Bump kubernetes operator okhttp version to 4.11.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] czy006 opened a new pull request, #610: [FLINK-32243] Bump okhttp version to 4.11.0
czy006 opened a new pull request, #610: URL: https://github.com/apache/flink-kubernetes-operator/pull/610 Bump okhttp version to 4.11.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] xccui commented on a diff in pull request #609: [FLINK-32171] Add PostStart hook to flink k8s operator helm
xccui commented on code in PR #609: URL: https://github.com/apache/flink-kubernetes-operator/pull/609#discussion_r1214659153 ## helm/flink-kubernetes-operator/values.yaml: ## @@ -183,3 +183,7 @@ operatorHealth: startupProbe: failureThreshold: 30 periodSeconds: 10 + +# Set exec command for the postStart hook of the main container +postStart: + execCommand: [] Review Comment: Sounds good. I'll make an update -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32171) Add PostStart hook to flink k8s operator helm
[ https://issues.apache.org/jira/browse/FLINK-32171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32171: --- Labels: pull-request-available (was: ) > Add PostStart hook to flink k8s operator helm > - > > Key: FLINK-32171 > URL: https://issues.apache.org/jira/browse/FLINK-32171 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.6.0, kubernetes-operator-1.5.1 > > > I feel it will be convenient to add a PostStart hook optional config to flink > k8s operator helm (e.g. when users need to download some Flink plugins). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #609: [FLINK-32171] Add PostStart hook to flink k8s operator helm
gyfora commented on code in PR #609: URL: https://github.com/apache/flink-kubernetes-operator/pull/609#discussion_r1214506467 ## helm/flink-kubernetes-operator/values.yaml: ## @@ -183,3 +183,7 @@ operatorHealth: startupProbe: failureThreshold: 30 periodSeconds: 10 + +# Set exec command for the postStart hook of the main container +postStart: + execCommand: [] Review Comment: I think it would be more intuitive to provide the entire yaml content of the `postStart` section instead of `execCommand` which would require extra documentation and would limit the usage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] matriv commented on pull request #29: [FLINK-31551] Add support for CrateDB
matriv commented on PR #29: URL: https://github.com/apache/flink-connector-jdbc/pull/29#issuecomment-1573891656 Thank you all involved in the PR and for all the iterations! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] snuyanzin commented on pull request #29: [FLINK-31551] Add support for CrateDB
snuyanzin commented on PR #29: URL: https://github.com/apache/flink-connector-jdbc/pull/29#issuecomment-1573890436 thanks for the contribution @matriv thanks for the review @libenchao , @eskabetxe i'm about to merge it this weekend -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32245) NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to incorrect test initialization
[ https://issues.apache.org/jira/browse/FLINK-32245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728734#comment-17728734 ] lincoln lee commented on FLINK-32245: - fixed in master: bfe49b2973d4ffc8f7404a376cab1e419b53406a > NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to > incorrect test initialization > --- > > Key: FLINK-32245 > URL: https://issues.apache.org/jira/browse/FLINK-32245 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0, 1.17.1 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > The test case NonDeterministicTests #testTemporalFunctionsInBatchMode has > been consistently failing due to incorrect test initialization. > > However, this failure has been masked because the test class name ends with > "Tests", causing the CI to skip the test case, which has been further > validated by searching through the historical logs of the CI. > This issue needs to be addressed, and the test case should be executed to > ensure proper testing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32245) NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to incorrect test initialization
[ https://issues.apache.org/jira/browse/FLINK-32245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee closed FLINK-32245. --- Resolution: Fixed > NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to > incorrect test initialization > --- > > Key: FLINK-32245 > URL: https://issues.apache.org/jira/browse/FLINK-32245 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0, 1.17.1 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > The test case NonDeterministicTests #testTemporalFunctionsInBatchMode has > been consistently failing due to incorrect test initialization. > > However, this failure has been masked because the test class name ends with > "Tests", causing the CI to skip the test case, which has been further > validated by searching through the historical logs of the CI. > This issue needs to be addressed, and the test case should be executed to > ensure proper testing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lincoln-lil merged pull request #22701: [FLINK-32245][table-planner] Rename NonDeterministicTests to NonDeterministicTest and fix incorrect test initialization
lincoln-lil merged PR #22701: URL: https://github.com/apache/flink/pull/22701 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] ottomata commented on pull request #604: [FLINK-32041] - Allow operator to manage leases when using watchNamespaces
ottomata commented on PR #604: URL: https://github.com/apache/flink-kubernetes-operator/pull/604#issuecomment-1573642377 Okay, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan
luoyuxia commented on code in PR #22593: URL: https://github.com/apache/flink/pull/22593#discussion_r1214112329 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java: ## @@ -84,22 +84,23 @@ public StateMetadata( stateIndex, TimeUtils.parseDuration( Preconditions.checkNotNull(stateTtl, "state ttl should not be null")), -Preconditions.checkNotNull(stateName, "state name should not be null")); +stateName); } -public StateMetadata(int stateIndex, @Nonnull Duration stateTtl, @Nonnull String stateName) { +public StateMetadata(int stateIndex, Duration stateTtl, String stateName) { Preconditions.checkArgument(stateIndex >= 0, "state index should start from 0"); this.stateIndex = stateIndex; -this.stateTtl = stateTtl; -this.stateName = stateName; +this.stateTtl = Preconditions.checkNotNull(stateTtl, "state ttl should not be null"); +this.stateName = Preconditions.checkNotNull(stateName, "state name should not be null"); } public int getStateIndex() { return stateIndex; } -public long getStateTtl() { -return stateTtl.toMillis(); +@JsonGetter(value = "ttl") +public String getStateTtl() { +return TimeUtils.formatWithHighestUnit(stateTtl); } public String getStateName() { Review Comment: Can this method removed? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java: ## @@ -256,12 +276,17 @@ private static void checkUidModification( // Helper methods // -private static CompiledPlan minimalPlan(TableEnvironment env) { +private static CompiledPlan planFromFlink1_18(TableEnvironment env) { Review Comment: rename to `planFromCurrentFlink`? As release moves on, the name `planFromFlink1_18` may be out of date. ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java: ## @@ -332,7 +319,7 @@ protected Transformation createJoinTransformation( } } -private Transformation createSyncLookupJoinWithState( +protected Transformation createSyncLookupJoinWithState( Review Comment: Is there any strong reason to make it proected and make `createSyncLookupJoinWithState`? For me, it's not so much like a base method to be overrided by children classes at least for `BatchExecLookupJoin` since there won't be LookupJoinWithState in `BatchExecLookupJoin` . If we do want to make it a base method , I think we should throw an exception in default implementation to avoid the method be called by mistake but without specific implemtation. ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java: ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.jsonplan; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.utils.JsonPlanTestBase; +import org.apache.flink.table.planner.utils.JsonTestUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for configuring operator-level state TTL via {@link + * org.apache.flink.table.api.CompiledPlan}. + */ +public class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase { + +@Test +public void testDifferentStateTtlForDifferentOneInputOperator() throws Exception { +String dataId =
[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
[ https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728714#comment-17728714 ] Liu commented on FLINK-30629: - [~Sergey Nuyanzin] Thanks. From the log, we can see the logs in time order: # The dispatcher shuts down for that the client's heartbeat timeout. # The client begins to report its heartbeat. The reason is that the client will report its heartbeat after calling the method waitUntilJobInitializationFinished. In this method, we try to get the job's status by waiting exponentially and it may take a while. There are two ways to fix the test: # Increase the client's timeout from 500 ms to 1 second or more. # In the method waitUntilJobInitializationFinished, try to get the job's status more frequently. What do you think? cc [~xtsong] > ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable > - > > Key: FLINK-30629 > URL: https://issues.apache.org/jira/browse/FLINK-30629 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.17.0, 1.18.0 >Reporter: Xintong Song >Assignee: Weijie Guo >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.17.0 > > Attachments: ClientHeartbeatTestLog.txt, > logs-cron_azure-test_cron_azure_core-1685497478.zip > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819 > {code:java} > Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 21.02 s <<< FAILURE! - in > org.apache.flink.client.ClientHeartbeatTest > Jan 11 04:32:39 [ERROR] > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat > Time elapsed: 9.157 s <<< ERROR! > Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet > running or has already been shut down. > Jan 11 04:32:39 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91) > Jan 11 04:32:39 at > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] yuchen-ecnu commented on pull request #22659: [FLINK-32186][runtime-web] Support subtask stack auto-search when red…
yuchen-ecnu commented on PR #22659: URL: https://github.com/apache/flink/pull/22659#issuecomment-1573601846 Hi @Myasuka, can you help me to review this PR? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214225246 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.TieredStorageTestUtils.generateRandomData; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** Tests for {@link TieredStorageProducerClient}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class TieredStorageProducerClientTest { + +private static final int NUM_TOTAL_BUFFERS = 1000; + +private static final int NETWORK_BUFFER_SIZE = 1024; + +@Parameter public boolean isBroadcast; + +private NetworkBufferPool globalPool; + +@Parameters(name = "isBroadcast={0}") +public static Collection parameters() { +return Arrays.asList(false, true); +} + +@BeforeEach +void before() { +globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, NETWORK_BUFFER_SIZE); +} + +@AfterEach +void after() { +globalPool.destroy(); +} + +@TestTemplate +void testWriteRecordsToEmptyStorageTiers() { +int numSubpartitions = 10; +int bufferSize = 1024; +Random random = new Random(); + +TieredStorageProducerClient tieredStorageProducerClient = +createTieredStorageProducerClient(numSubpartitions, Collections.emptyList()); +assertThatThrownBy( +() -> +tieredStorageProducerClient.write( +generateRandomData(bufferSize, random), +new TieredStorageSubpartitionId(0), +Buffer.DataType.DATA_BUFFER, +isBroadcast)) +.isInstanceOf(RuntimeException.class) +.hasMessageContaining("Failed to choose a storage tier"); +} + +@TestTemplate +void testWriteRecords() throws IOException { +int numSubpartitions = 10; +int bufferSize = 1024; +int maxNumToWriteRecordsPerSubpartition = 1000; +Random random = new Random(); + +TestingTierProducerAgent tierProducerAgent = new TestingTierProducerAgent(); +tierProducerAgent.setTryStartNewSegmentReturnValueSupplier(() -> true); +tierProducerAgent.setTryWriteReturnValueSupplier(() -> new Boolean[] {false, true}); + +TieredStorageProducerClient tieredStorageProducerClient = +createTieredStorageProducerClient( +numSubpartitions, Collections.singletonList(tierProducerAgent)); + +int numWriteRecords = 0; +for (int j = 0; j < numSubpartitions; j++) { +for
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214224183 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.TieredStorageTestUtils.generateRandomData; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** Tests for {@link TieredStorageProducerClient}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class TieredStorageProducerClientTest { + +private static final int NUM_TOTAL_BUFFERS = 1000; + +private static final int NETWORK_BUFFER_SIZE = 1024; + +@Parameter public boolean isBroadcast; + +private NetworkBufferPool globalPool; + +@Parameters(name = "isBroadcast={0}") +public static Collection parameters() { +return Arrays.asList(false, true); +} + +@BeforeEach +void before() { +globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, NETWORK_BUFFER_SIZE); +} + +@AfterEach +void after() { +globalPool.destroy(); +} + +@TestTemplate +void testWriteRecordsToEmptyStorageTiers() { +int numSubpartitions = 10; +int bufferSize = 1024; +Random random = new Random(); + +TieredStorageProducerClient tieredStorageProducerClient = +createTieredStorageProducerClient(numSubpartitions, Collections.emptyList()); +assertThatThrownBy( +() -> +tieredStorageProducerClient.write( +generateRandomData(bufferSize, random), +new TieredStorageSubpartitionId(0), +Buffer.DataType.DATA_BUFFER, +isBroadcast)) +.isInstanceOf(RuntimeException.class) +.hasMessageContaining("Failed to choose a storage tier"); +} + +@TestTemplate +void testWriteRecords() throws IOException { +int numSubpartitions = 10; +int bufferSize = 1024; +int maxNumToWriteRecordsPerSubpartition = 1000; +Random random = new Random(); + +TestingTierProducerAgent tierProducerAgent = new TestingTierProducerAgent(); +tierProducerAgent.setTryStartNewSegmentReturnValueSupplier(() -> true); +tierProducerAgent.setTryWriteReturnValueSupplier(() -> new Boolean[] {false, true}); + +TieredStorageProducerClient tieredStorageProducerClient = +createTieredStorageProducerClient( +numSubpartitions, Collections.singletonList(tierProducerAgent)); + +int numWriteRecords = 0; +for (int j = 0; j < numSubpartitions; j++) { +for
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214223911 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +import java.util.function.Supplier; + +/** Test implementation for {@link TierProducerAgent}. */ +public class TestingTierProducerAgent implements TierProducerAgent { + +private Supplier tryStartNewSegmentReturnValueSupplier; + +private Supplier tryWriteReturnValueSupplier; + +private int numTotalReceivedBuffers; + +private int tryWriteCounter; + +private boolean isClosed; + +public void setTryStartNewSegmentReturnValueSupplier( +Supplier tryStartNewSegmentReturnValueSupplier) { +this.tryStartNewSegmentReturnValueSupplier = tryStartNewSegmentReturnValueSupplier; +} + +public void setTryWriteReturnValueSupplier(Supplier tryWriteReturnValueSupplier) { +this.tryWriteReturnValueSupplier = tryWriteReturnValueSupplier; +} + +public int numTotalReceivedBuffers() { +return numTotalReceivedBuffers; +} + +public boolean isClosed() { +return isClosed; +} + +@Override +public boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int segmentId) { +return tryStartNewSegmentReturnValueSupplier.get(); +} + +@Override +public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer) { +boolean isSuccess = tryWriteReturnValueSupplier.get()[(tryWriteCounter++) % 2]; +if (isSuccess) { +numTotalReceivedBuffers++; +} +return isSuccess; Review Comment: Fixed. ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +import java.util.function.Supplier; + +/** Test implementation for {@link TierProducerAgent}. */ +public class TestingTierProducerAgent implements TierProducerAgent { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214223698 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +import java.util.function.Supplier; + +/** Test implementation for {@link TierProducerAgent}. */ +public class TestingTierProducerAgent implements TierProducerAgent { + +private Supplier tryStartNewSegmentReturnValueSupplier; + +private Supplier tryWriteReturnValueSupplier; Review Comment: Fixed. ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +import java.util.function.Supplier; + +/** Test implementation for {@link TierProducerAgent}. */ +public class TestingTierProducerAgent implements TierProducerAgent { + +private Supplier tryStartNewSegmentReturnValueSupplier; + +private Supplier tryWriteReturnValueSupplier; + +private int numTotalReceivedBuffers; + +private int tryWriteCounter; + +private boolean isClosed; + +public void setTryStartNewSegmentReturnValueSupplier( +Supplier tryStartNewSegmentReturnValueSupplier) { +this.tryStartNewSegmentReturnValueSupplier = tryStartNewSegmentReturnValueSupplier; +} + +public void setTryWriteReturnValueSupplier(Supplier tryWriteReturnValueSupplier) { +this.tryWriteReturnValueSupplier = tryWriteReturnValueSupplier; +} + +public int numTotalReceivedBuffers() { +return numTotalReceivedBuffers; +} + +public boolean isClosed() { +return isClosed; +} + +@Override +public boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int segmentId) { +return tryStartNewSegmentReturnValueSupplier.get(); +} + +@Override +public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer) { +boolean isSuccess = tryWriteReturnValueSupplier.get()[(tryWriteCounter++) % 2]; +if (isSuccess) { +numTotalReceivedBuffers++; +} +return isSuccess; +} + +@Override +public void close() { +this.isClosed = true; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214223224 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +import java.util.function.Supplier; + +/** Test implementation for {@link TierProducerAgent}. */ +public class TestingTierProducerAgent implements TierProducerAgent { + +private Supplier tryStartNewSegmentReturnValueSupplier; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate
[ https://issues.apache.org/jira/browse/FLINK-32247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren updated FLINK-32247: -- Description: Considering a SQL statement below: {code:java} CREATE TABLE source1 ( `id` BIGINT, `item` STRING, `price` DOUBLE, `ts` STRING, `rowtime` AS TO_TIMESTAMP(`ts`), WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND) WITH ( 'connector' = 'datagen' ); SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM ( SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS `price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, `window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; /* Use time attributes defined in the previous window aggregation as grouping keys */{code} which should be a group aggregation after a window aggregation, but the planner is interpreting the latter aggregation as a GroupWindowAggregation: {code:java} == Optimized Physical Plan == Calc(select=[window_start, window_end, window_time, EXPR$3]) +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[single]) +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) +- Calc(select=[window_start, window_end, window_time]) +- GlobalWindowAggregate(groupBy=[item], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[hash[item]]) +- LocalWindowAggregate(groupBy=[item], window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) AS $slice_end]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[id, item, price, ts]) {code} The trick is that the latter group aggregation uses time attributes defined in the previous window aggregation as grouping keys. was: Considering a SQL statement below: {code:java} CREATE TABLE source1 ( `id` BIGINT, `item` STRING, `price` DOUBLE, `ts` STRING, `rowtime` AS TO_TIMESTAMP(`ts`), WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND) WITH ( 'connector' = 'datagen' ); SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM ( SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS `price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, `window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; /* Use time attributes defined in the previous window aggregation as grouping keys */{code} which should be a group aggregation after a windowed aggregation, but the planner is interpreting the latter aggregation as a GroupWindowAggregation: {code:java} == Optimized Physical Plan == Calc(select=[window_start, window_end, window_time, EXPR$3]) +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[single]) +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) +- Calc(select=[window_start, window_end, window_time]) +- GlobalWindowAggregate(groupBy=[item], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[hash[item]]) +- LocalWindowAggregate(groupBy=[item], window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) AS $slice_end]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[id, item, price, ts]) {code} The trick is that the latter group aggregation uses time attributes defined in the previous window aggregation as grouping keys. > Normal group by with time attributes after a window group by
[jira] [Commented] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate
[ https://issues.apache.org/jira/browse/FLINK-32247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728674#comment-17728674 ] Qingsheng Ren commented on FLINK-32247: --- Not sure if this is by-design or a bug in planner > Normal group by with time attributes after a window group by is interpreted > as GlobalWindowAggregate > > > Key: FLINK-32247 > URL: https://issues.apache.org/jira/browse/FLINK-32247 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.2, 1.18.0, 1.17.1 >Reporter: Qingsheng Ren >Priority: Major > > Considering a SQL statement below: > {code:java} > CREATE TABLE source1 ( > `id` BIGINT, > `item` STRING, > `price` DOUBLE, > `ts` STRING, > `rowtime` AS TO_TIMESTAMP(`ts`), > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND) > WITH ( > 'connector' = 'datagen' > ); > SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM ( > SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) > AS `price_sum` FROM > TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' > MINUTES)) > GROUP BY `window_start`, `window_end`, `window_time`, `item`) > GROUP BY `window_start`, `window_end`, `window_time`; /* Use time attributes > defined in the previous window aggregation as grouping keys */{code} > which should be a group aggregation after a window aggregation, but the > planner is interpreting the latter aggregation as a GroupWindowAggregation: > {code:java} > == Optimized Physical Plan == > Calc(select=[window_start, window_end, window_time, EXPR$3]) > +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 > min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, > end('w$) AS window_end, rowtime('w$) AS window_time]) > +- Exchange(distribution=[single]) > +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], > win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, > slice_end('w$) AS $window_end]) > +- Calc(select=[window_start, window_end, window_time]) > +- GlobalWindowAggregate(groupBy=[item], > window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, > start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS > window_time]) > +- Exchange(distribution=[hash[item]]) > +- LocalWindowAggregate(groupBy=[item], > window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, > slice_end('w$) AS $slice_end]) > +- WatermarkAssigner(rowtime=[rowtime], > watermark=[-(rowtime, 5000:INTERVAL SECOND)]) > +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS > rowtime]) > +- TableSourceScan(table=[[default_catalog, > default_database, source1]], fields=[id, item, price, ts]) {code} > The trick is that the latter group aggregation uses time attributes defined > in the previous window aggregation as grouping keys. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate
[ https://issues.apache.org/jira/browse/FLINK-32247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren updated FLINK-32247: -- Description: Considering a SQL statement below: {code:java} CREATE TABLE source1 ( `id` BIGINT, `item` STRING, `price` DOUBLE, `ts` STRING, `rowtime` AS TO_TIMESTAMP(`ts`), WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND) WITH ( 'connector' = 'datagen' ); SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM ( SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS `price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, `window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; /* Use time attributes defined in the previous window aggregation as grouping keys */{code} which should be a group aggregation after a windowed aggregation, but the planner is interpreting the latter aggregation as a GroupWindowAggregation: {code:java} == Optimized Physical Plan == Calc(select=[window_start, window_end, window_time, EXPR$3]) +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[single]) +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) +- Calc(select=[window_start, window_end, window_time]) +- GlobalWindowAggregate(groupBy=[item], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[hash[item]]) +- LocalWindowAggregate(groupBy=[item], window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) AS $slice_end]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[id, item, price, ts]) {code} The trick is that the latter group aggregation uses time attributes defined in the previous window aggregation as grouping keys. was: Considering a SQL statement below: {code:java} SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM ( SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS `price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, `window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; {code} which should be a group aggregation after a windowed aggregation, but the planner is interpreting the latter aggregation as a GroupWindowAggregation: {code:java} == Optimized Physical Plan == Calc(select=[window_start, window_end, window_time, EXPR$3]) +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[single]) +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) +- Calc(select=[window_start, window_end, window_time]) +- GlobalWindowAggregate(groupBy=[item], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[hash[item]]) +- LocalWindowAggregate(groupBy=[item], window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) AS $slice_end]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[id, item, price, ts]) {code} > Normal group by with time attributes after a window group by is interpreted > as GlobalWindowAggregate > > > Key: FLINK-32247 > URL: https://issues.apache.org/jira/browse/FLINK-32247 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.2, 1.18.0, 1.17.1 >Reporter: Qingsheng Ren >
[jira] [Updated] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate
[ https://issues.apache.org/jira/browse/FLINK-32247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren updated FLINK-32247: -- Description: Considering a SQL statement below: {code:java} SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM ( SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS `price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, `window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; {code} which should be a group aggregation after a windowed aggregation, but the planner is interpreting the latter aggregation as a GroupWindowAggregation: {code:java} == Optimized Physical Plan == Calc(select=[window_start, window_end, window_time, EXPR$3]) +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[single]) +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) +- Calc(select=[window_start, window_end, window_time]) +- GlobalWindowAggregate(groupBy=[item], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[hash[item]]) +- LocalWindowAggregate(groupBy=[item], window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) AS $slice_end]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[id, item, price, ts]) {code} was: Considering a SQL statement below: {code:java} SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS `price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, `window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; {code} which should be a group aggregation after a windowed aggregation, but the planner is interpreting the latter aggregation as a GroupWindowAggregation: {code:java} == Optimized Physical Plan == Calc(select=[window_start, window_end, window_time, EXPR$3]) +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[single]) +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) +- Calc(select=[window_start, window_end, window_time]) +- GlobalWindowAggregate(groupBy=[item], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[hash[item]]) +- LocalWindowAggregate(groupBy=[item], window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) AS $slice_end]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[id, item, price, ts]) {code} > Normal group by with time attributes after a window group by is interpreted > as GlobalWindowAggregate > > > Key: FLINK-32247 > URL: https://issues.apache.org/jira/browse/FLINK-32247 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.2, 1.18.0, 1.17.1 >Reporter: Qingsheng Ren >Priority: Major > > Considering a SQL statement below: > > {code:java} > SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM ( > SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) > AS `price_sum` FROM > TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES)) > GROUP BY `window_start`, `window_end`, `window_time`, `item`) > GROUP BY `window_start`, `window_end`, `window_time`;
[jira] [Created] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate
Qingsheng Ren created FLINK-32247: - Summary: Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate Key: FLINK-32247 URL: https://issues.apache.org/jira/browse/FLINK-32247 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.17.1, 1.16.2, 1.18.0 Reporter: Qingsheng Ren Considering a SQL statement below: {code:java} SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS `price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, `window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; {code} which should be a group aggregation after a windowed aggregation, but the planner is interpreting the latter aggregation as a GroupWindowAggregation: {code:java} == Optimized Physical Plan == Calc(select=[window_start, window_end, window_time, EXPR$3]) +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[single]) +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) +- Calc(select=[window_start, window_end, window_time]) +- GlobalWindowAggregate(groupBy=[item], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +- Exchange(distribution=[hash[item]]) +- LocalWindowAggregate(groupBy=[item], window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) AS $slice_end]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, source1]], fields=[id, item, price, ts]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32246) javax.management.InstanceAlreadyExistsException
[ https://issues.apache.org/jira/browse/FLINK-32246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728658#comment-17728658 ] Aitozi commented on FLINK-32246: where do you run this sql ? sql-client ? Can you provide a demo case to reproduce this ? > javax.management.InstanceAlreadyExistsException > --- > > Key: FLINK-32246 > URL: https://issues.apache.org/jira/browse/FLINK-32246 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.2 >Reporter: jeff-zou >Priority: Major > > Flink SQL throws an > exception(javax.management.InstanceAlreadyExistsException) when trying to > perform multiple sink operations on the same kafka source . > > sql example: > {code:java} > create table kafka_source() with ('connector'='kafka'); > insert into sink_table1 select * from kafka_source; > insert into sink_table2 select * from kafka_source; {code} > The Exception as below: > {code:java} > javax.management.InstanceAlreadyExistsException: > kafka.admin.client:type=app-info,id=* > > java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) > > java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) > > java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) > > java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) > > java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) > > java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) > > org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500) > > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444) > org.apache.kafka.clients.admin.Admin.create(Admin.java:59) > org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) > > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410) > > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151) > > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:209) > > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406) > > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) > > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > java.base/java.lang.Thread.run(Thread.java:829) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] swuferhong commented on a diff in pull request #22684: [FLINK-32220][table-runtime] Improving the adaptive local hash agg code to avoid get value from RowData repeatedly
swuferhong commented on code in PR #22684: URL: https://github.com/apache/flink/pull/22684#discussion_r1214139725 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala: ## @@ -231,25 +230,23 @@ object ProjectionCodeGenerator { inputTerm: String, targetType: LogicalType, index: Int): GeneratedExpression = { -val fieldType = getFieldTypes(inputType).get(index) -val resultTypeTerm = primitiveTypeTermForType(fieldType) -val defaultValue = primitiveDefaultValue(fieldType) -val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType) -val Seq(fieldTerm, nullTerm) = - ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", "isNull")) - -val inputCode = - s""" - |$nullTerm = $inputTerm.isNullAt($index); - |$fieldTerm = $defaultValue; - |if (!$nullTerm) { - | $fieldTerm = $readCode; - |} - """.stripMargin.trim - -val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, fieldType) +val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, index) // Convert the projected value type to sum agg func target type. -ScalarOperatorGens.generateCast(ctx, expression, targetType, true) +ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true) Review Comment: nit: Removing the IDEA warning: Changing to `ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, nullOnFailure = true)` ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala: ## @@ -231,25 +230,23 @@ object ProjectionCodeGenerator { inputTerm: String, targetType: LogicalType, index: Int): GeneratedExpression = { -val fieldType = getFieldTypes(inputType).get(index) -val resultTypeTerm = primitiveTypeTermForType(fieldType) -val defaultValue = primitiveDefaultValue(fieldType) -val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType) -val Seq(fieldTerm, nullTerm) = - ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", "isNull")) - -val inputCode = - s""" - |$nullTerm = $inputTerm.isNullAt($index); - |$fieldTerm = $defaultValue; - |if (!$nullTerm) { - | $fieldTerm = $readCode; - |} - """.stripMargin.trim - -val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, fieldType) +val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, index) // Convert the projected value type to sum agg func target type. -ScalarOperatorGens.generateCast(ctx, expression, targetType, true) +ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true) + } + + /** Get reuse field expr if it has been evaluated before for adaptive local hash aggregation. */ + def getReuseFieldExprForAggFunc( + ctx: CodeGeneratorContext, + inputType: LogicalType, + inputTerm: String, + index: Int) = { +// reuse the field access code if it has been evaluated before Review Comment: -> // Reuse the field access code if it has been evaluated before. ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala: ## @@ -231,25 +230,23 @@ object ProjectionCodeGenerator { inputTerm: String, targetType: LogicalType, index: Int): GeneratedExpression = { -val fieldType = getFieldTypes(inputType).get(index) -val resultTypeTerm = primitiveTypeTermForType(fieldType) -val defaultValue = primitiveDefaultValue(fieldType) -val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType) -val Seq(fieldTerm, nullTerm) = - ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", "isNull")) - -val inputCode = - s""" - |$nullTerm = $inputTerm.isNullAt($index); - |$fieldTerm = $defaultValue; - |if (!$nullTerm) { - | $fieldTerm = $readCode; - |} - """.stripMargin.trim - -val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, fieldType) +val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, index) // Convert the projected value type to sum agg func target type. -ScalarOperatorGens.generateCast(ctx, expression, targetType, true) +ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true) + } + + /** Get reuse field expr if it has been evaluated before for adaptive local hash aggregation. */ + def getReuseFieldExprForAggFunc( + ctx: CodeGeneratorContext, + inputType: LogicalType, + inputTerm: String, + index: Int) = { Review Comment: Add type annotation : ` index: Int): GeneratedExpression = {` -- This is an automated message from the Apache Git Service. To respond to
[jira] [Resolved] (FLINK-31894) ExceptionHistory and REST API failure label integration
[ https://issues.apache.org/jira/browse/FLINK-31894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Morávek resolved FLINK-31894. --- Fix Version/s: 1.18.0 Resolution: Fixed > ExceptionHistory and REST API failure label integration > --- > > Key: FLINK-31894 > URL: https://issues.apache.org/jira/browse/FLINK-31894 > Project: Flink > Issue Type: Sub-task > Components: Runtime / REST >Reporter: Panagiotis Garefalakis >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31894) ExceptionHistory and REST API failure label integration
[ https://issues.apache.org/jira/browse/FLINK-31894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Morávek reassigned FLINK-31894: - Assignee: Panagiotis Garefalakis > ExceptionHistory and REST API failure label integration > --- > > Key: FLINK-31894 > URL: https://issues.apache.org/jira/browse/FLINK-31894 > Project: Flink > Issue Type: Sub-task > Components: Runtime / REST >Reporter: Panagiotis Garefalakis >Assignee: Panagiotis Garefalakis >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31894) ExceptionHistory and REST API failure label integration
[ https://issues.apache.org/jira/browse/FLINK-31894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728640#comment-17728640 ] David Morávek commented on FLINK-31894: --- master: 28c20ad70c7100ae2358fa3f8936663f30811f78 > ExceptionHistory and REST API failure label integration > --- > > Key: FLINK-31894 > URL: https://issues.apache.org/jira/browse/FLINK-31894 > Project: Flink > Issue Type: Sub-task > Components: Runtime / REST >Reporter: Panagiotis Garefalakis >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dmvk merged pull request #22643: [FLINK-31894][runtime] ExceptionHistory and REST API failure label integration
dmvk merged PR #22643: URL: https://github.com/apache/flink/pull/22643 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1214124394 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: > maybe using `Path` or `File` instead of `String` for `localPath`. I think localPath is just a file name, `Path` is a interface and not implement `Serializable`, `File` does implement `Serializable`, but it will also become a path string when it is finally serialized into the _metadata file. So I think `String` is clearer and enough ? About 'only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle': In fact I want to suggest developers to do this in the doc of `PlaceholderStreamStateHandle` and `SharedStateRegistry`. Perhaps some bugs could have been avoided if developers followed this advice and only performed replacements on `PlaceholderStreamStateHandle` . I'm not sure if this is over-designed, if so please correct me. :heart: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22701: [FLINK-32245][table-planner] Rename NonDeterministicTests to NonDeterministicTest and fix incorrect test initialization
flinkbot commented on PR #22701: URL: https://github.com/apache/flink/pull/22701#issuecomment-1573405644 ## CI report: * 016cbcb8431da5a91e0a690c395567e50e633e33 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32245) NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to incorrect test initialization
[ https://issues.apache.org/jira/browse/FLINK-32245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32245: --- Labels: pull-request-available (was: ) > NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to > incorrect test initialization > --- > > Key: FLINK-32245 > URL: https://issues.apache.org/jira/browse/FLINK-32245 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0, 1.17.1 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > The test case NonDeterministicTests #testTemporalFunctionsInBatchMode has > been consistently failing due to incorrect test initialization. > > However, this failure has been masked because the test class name ends with > "Tests", causing the CI to skip the test case, which has been further > validated by searching through the historical logs of the CI. > This issue needs to be addressed, and the test case should be executed to > ensure proper testing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] LadyForest opened a new pull request, #22701: [FLINK-32245][table-planner] Rename NonDeterministicTests to NonDeterministicTest and fix incorrect test initialization
LadyForest opened a new pull request, #22701: URL: https://github.com/apache/flink/pull/22701 ## What is the purpose of the change This PR fixes the incorrect test initialization for `NonDeterministicTests` which uses `streamTableEnv` to test batch behavior, which fails of course but has not been revealed due to the JUnit cannot recognize test ends with "Tests". ## Brief change log - Rename `NonDeterministicTests` to `NonDeterministicTest` to make CI recognize the test. - Fix the initialization way for `ExpressionTestBase` on how to init table env, which should be based on streaming or batch mode. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no**/ don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
reswqa commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214057806 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +import java.util.function.Supplier; + +/** Test implementation for {@link TierProducerAgent}. */ +public class TestingTierProducerAgent implements TierProducerAgent { + +private Supplier tryStartNewSegmentReturnValueSupplier; Review Comment: ```suggestion private Supplier tryStartNewSegmentSupplier; ``` In general, for mock class, we named it's fields as follows: `MethodName+(Runnable/Supplier/Consumer/Function)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
reswqa commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214057806 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +import java.util.function.Supplier; + +/** Test implementation for {@link TierProducerAgent}. */ +public class TestingTierProducerAgent implements TierProducerAgent { + +private Supplier tryStartNewSegmentReturnValueSupplier; Review Comment: ```suggestion private Supplier tryStartNewSegmentSupplier; ``` ```suggestion private Supplier tryStartNewSegmentSupplier; ``` In general, for mock class, we named it's fields as follows: `MethodName+(Runnable/Supplier/Consumer/Function)` ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; + +import java.util.function.Supplier; + +/** Test implementation for {@link TierProducerAgent}. */ +public class TestingTierProducerAgent implements TierProducerAgent { + +private Supplier tryStartNewSegmentReturnValueSupplier; + +private Supplier tryWriteReturnValueSupplier; Review Comment: ```suggestion private BiFunction tryWriterFunction; ``` ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator; +import
[GitHub] [flink] TanYuxin-tyx commented on pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on PR #22652: URL: https://github.com/apache/flink/pull/22652#issuecomment-1573354370 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214057512 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ## @@ -80,13 +107,23 @@ public void write( if (isBroadcast && !isBroadcastOnly) { for (int i = 0; i < numSubpartitions; ++i) { -bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType); +// As the tiered storage subpartition ID is created only for broadcast records, +// which are fewer than normal records, the performance impact of generating new +// TieredStorageSubpartitionId objects is expected to be manageable. If the +// performance is significantly affected, this logic will be optimized accordingly. +bufferAccumulator.receive( +record.duplicate(), new TieredStorageSubpartitionId(i), dataType); } } else { bufferAccumulator.receive(record, subpartitionId, dataType); } } +public void setMetricStatisticsUpdater( +Consumer metricStatisticsUpdater) { +this.metricStatisticsUpdater = metricStatisticsUpdater; Review Comment: Ah. I missed this. Fixed this now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1214051048 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks @zoltar9264 , > Do you mean change IncrementalRemoteKeyedStateHandle like this ? Yes, I mean something like that, maybe using `Path` or `File` instead of `String` for `localPath`. > I suggest only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle. This pr already implemented not use PlaceholderStreamStateHandle calculate checkpointed size, I want keep it. Can you share the motivation? I think that this will just add an additional `instanceof` and increase complexity. It would also easier to break if we add a new type of handle that needs replacement. Or am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1214051048 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: > Do you mean change IncrementalRemoteKeyedStateHandle like this ? Yes, I mean something like that, maybe using `Path` or `File` instead of `String` for `localPath`. > I suggest only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle. This pr already implemented not use PlaceholderStreamStateHandle calculate checkpointed size, I want keep it. Can you share the motivation? I think that this will just add an additional `instanceof` and increase complexity. It would also easier to break if we add a new type of handle that needs replacement. Or am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
reswqa commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214045783 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ## @@ -80,13 +107,23 @@ public void write( if (isBroadcast && !isBroadcastOnly) { for (int i = 0; i < numSubpartitions; ++i) { -bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType); +// As the tiered storage subpartition ID is created only for broadcast records, +// which are fewer than normal records, the performance impact of generating new +// TieredStorageSubpartitionId objects is expected to be manageable. If the +// performance is significantly affected, this logic will be optimized accordingly. +bufferAccumulator.receive( +record.duplicate(), new TieredStorageSubpartitionId(i), dataType); } } else { bufferAccumulator.receive(record, subpartitionId, dataType); } } +public void setMetricStatisticsUpdater( +Consumer metricStatisticsUpdater) { +this.metricStatisticsUpdater = metricStatisticsUpdater; Review Comment: It seems that this line is not updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on PR #22652: URL: https://github.com/apache/flink/pull/22652#issuecomment-1573304176 @reswqa Thanks for helping review, I have addressed the comments. Could you help take a look again? Due to the significant code changes in the test code, I didn't use a fix-up mode to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214037314 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java: ## @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TieredStorageProducerClient}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class TieredStorageProducerClientTest { + +private static final int NUM_TOTAL_BUFFERS = 1000; + +private static final int NETWORK_BUFFER_SIZE = 1024; + +private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f; + +private static final int NUM_BUFFERS_IN_A_SEGMENT = 5; + +@Parameter public boolean isBroadcast; + +private NetworkBufferPool globalPool; + +@Parameters(name = "isBroadcast={0}") +public static Collection parameters() { +return Arrays.asList(false, true); +} + +@BeforeEach +void before() { +globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, NETWORK_BUFFER_SIZE); +} + +@AfterEach +void after() { +globalPool.destroy(); +} + +@TestTemplate +void testWriteRecordsToEmptyStorageTiers() throws IOException { +int numSubpartitions = 10; +int numBuffersInPool = 10; +int bufferSize = 1024; +Random random = new Random(); + +TieredStorageMemoryManagerImpl storageMemoryManager = +createStorageMemoryManager(numBuffersInPool); +BufferAccumulator bufferAccumulator = +new HashBufferAccumulator(numSubpartitions, bufferSize, storageMemoryManager); + +TieredStorageProducerClient tieredStorageProducerClient = +new TieredStorageProducerClient( +numSubpartitions, false, bufferAccumulator, null, Collections.emptyList()); + +assertThatThrownBy( +() -> +tieredStorageProducerClient.write( +generateRandomData(bufferSize, random), +new TieredStorageSubpartitionId(0), +Buffer.DataType.DATA_BUFFER, +isBroadcast)) +.isInstanceOf(RuntimeException.class) +.hasMessageContaining("Failed to choose a storage tier"); +} + +@TestTemplate +void testEmptyMetricUpdater() throws IOException { +int numSubpartitions = 10; +int numBuffersInPool = 10; +int bufferSize = 1024; +Random random = new Random(); + +TieredStorageMemoryManagerImpl storageMemoryManager = +createStorageMemoryManager(numBuffersInPool); +BufferAccumulator bufferAccumulator = +new
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214036888 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java: ## @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TieredStorageProducerClient}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class TieredStorageProducerClientTest { + +private static final int NUM_TOTAL_BUFFERS = 1000; + +private static final int NETWORK_BUFFER_SIZE = 1024; + +private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f; + +private static final int NUM_BUFFERS_IN_A_SEGMENT = 5; + +@Parameter public boolean isBroadcast; + +private NetworkBufferPool globalPool; + +@Parameters(name = "isBroadcast={0}") +public static Collection parameters() { +return Arrays.asList(false, true); +} + +@BeforeEach +void before() { +globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, NETWORK_BUFFER_SIZE); +} + +@AfterEach +void after() { +globalPool.destroy(); +} + +@TestTemplate +void testWriteRecordsToEmptyStorageTiers() throws IOException { +int numSubpartitions = 10; +int numBuffersInPool = 10; +int bufferSize = 1024; +Random random = new Random(); + +TieredStorageMemoryManagerImpl storageMemoryManager = +createStorageMemoryManager(numBuffersInPool); +BufferAccumulator bufferAccumulator = +new HashBufferAccumulator(numSubpartitions, bufferSize, storageMemoryManager); + +TieredStorageProducerClient tieredStorageProducerClient = +new TieredStorageProducerClient( +numSubpartitions, false, bufferAccumulator, null, Collections.emptyList()); + +assertThatThrownBy( +() -> +tieredStorageProducerClient.write( +generateRandomData(bufferSize, random), +new TieredStorageSubpartitionId(0), +Buffer.DataType.DATA_BUFFER, +isBroadcast)) +.isInstanceOf(RuntimeException.class) +.hasMessageContaining("Failed to choose a storage tier"); +} + +@TestTemplate +void testEmptyMetricUpdater() throws IOException { +int numSubpartitions = 10; +int numBuffersInPool = 10; +int bufferSize = 1024; +Random random = new Random(); + +TieredStorageMemoryManagerImpl storageMemoryManager = +createStorageMemoryManager(numBuffersInPool); +BufferAccumulator bufferAccumulator = +new
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214036360 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java: ## @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TieredStorageProducerClient}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class TieredStorageProducerClientTest { + +private static final int NUM_TOTAL_BUFFERS = 1000; + +private static final int NETWORK_BUFFER_SIZE = 1024; + +private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f; + +private static final int NUM_BUFFERS_IN_A_SEGMENT = 5; + +@Parameter public boolean isBroadcast; + +private NetworkBufferPool globalPool; + +@Parameters(name = "isBroadcast={0}") +public static Collection parameters() { +return Arrays.asList(false, true); +} + +@BeforeEach +void before() { +globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, NETWORK_BUFFER_SIZE); +} + +@AfterEach +void after() { +globalPool.destroy(); +} + +@TestTemplate +void testWriteRecordsToEmptyStorageTiers() throws IOException { +int numSubpartitions = 10; +int numBuffersInPool = 10; +int bufferSize = 1024; +Random random = new Random(); + +TieredStorageMemoryManagerImpl storageMemoryManager = +createStorageMemoryManager(numBuffersInPool); +BufferAccumulator bufferAccumulator = +new HashBufferAccumulator(numSubpartitions, bufferSize, storageMemoryManager); + +TieredStorageProducerClient tieredStorageProducerClient = +new TieredStorageProducerClient( +numSubpartitions, false, bufferAccumulator, null, Collections.emptyList()); + +assertThatThrownBy( +() -> +tieredStorageProducerClient.write( +generateRandomData(bufferSize, random), +new TieredStorageSubpartitionId(0), +Buffer.DataType.DATA_BUFFER, +isBroadcast)) +.isInstanceOf(RuntimeException.class) +.hasMessageContaining("Failed to choose a storage tier"); Review Comment: Fixed. Used the `TestingBufferAccumulator ` ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java: ## @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214035752 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java: ## @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TieredStorageProducerClient}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class TieredStorageProducerClientTest { + +private static final int NUM_TOTAL_BUFFERS = 1000; + +private static final int NETWORK_BUFFER_SIZE = 1024; + +private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f; + +private static final int NUM_BUFFERS_IN_A_SEGMENT = 5; + +@Parameter public boolean isBroadcast; + +private NetworkBufferPool globalPool; + +@Parameters(name = "isBroadcast={0}") +public static Collection parameters() { +return Arrays.asList(false, true); +} + +@BeforeEach +void before() { +globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, NETWORK_BUFFER_SIZE); +} + +@AfterEach +void after() { +globalPool.destroy(); +} + +@TestTemplate +void testWriteRecordsToEmptyStorageTiers() throws IOException { +int numSubpartitions = 10; +int numBuffersInPool = 10; +int bufferSize = 1024; +Random random = new Random(); + +TieredStorageMemoryManagerImpl storageMemoryManager = +createStorageMemoryManager(numBuffersInPool); +BufferAccumulator bufferAccumulator = +new HashBufferAccumulator(numSubpartitions, bufferSize, storageMemoryManager); Review Comment: Because we have mocked a `TesttingBufferAccumulatr`, we don't need this anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214034785 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java: ## @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TieredStorageProducerClient}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class TieredStorageProducerClientTest { + +private static final int NUM_TOTAL_BUFFERS = 1000; + +private static final int NETWORK_BUFFER_SIZE = 1024; + +private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f; + +private static final int NUM_BUFFERS_IN_A_SEGMENT = 5; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214031770 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java: ## @@ -35,9 +41,21 @@ public interface TierProducerAgent { */ boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int segmentId); -/** Writes the finished {@link Buffer} to the consumer. */ -boolean write(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer) -throws IOException; +/** + * Writes the finished {@link Buffer} to the consumer. + * + * Note that the tier must ensure that the buffer is written successfully without any + * exceptions, in order to guarantee that the buffer will be recycled. If this method throws an + * exception in the subsequent modifications, the caller should make sure that the buffer is + * recycled finally. + * + * @param subpartitionId the subpartition id that the buffer is writing to + * @param finishedBuffer the writing buffer + * @return return true if the buffer is written successfully, return false if the current + * segment can not store this buffer and the current segment is finished. When returning + * false, the agent should try start a new segment before writing the buffer. + */ +boolean write(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer); Review Comment: Renamed it. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ## @@ -100,26 +137,95 @@ public void close() { */ private void writeAccumulatedBuffers( TieredStorageSubpartitionId subpartitionId, List accumulatedBuffers) { -try { -for (Buffer finishedBuffer : accumulatedBuffers) { -writeAccumulatedBuffer(subpartitionId, finishedBuffer); +Iterator bufferIterator = accumulatedBuffers.iterator(); + +int numWriteBytes = 0; +int numWriteBuffers = 0; +while (bufferIterator.hasNext()) { +Buffer buffer = bufferIterator.next(); +try { +writeAccumulatedBuffer(subpartitionId, buffer); +} catch (IOException ioe) { +buffer.recycleBuffer(); +while (bufferIterator.hasNext()) { +bufferIterator.next().recycleBuffer(); +} +ExceptionUtils.rethrow(ioe); } -} catch (IOException e) { -ExceptionUtils.rethrow(e); +numWriteBuffers++; +numWriteBytes += buffer.readableBytes(); } +updateMetricStatistics(numWriteBuffers, numWriteBytes); } /** * Write the accumulated buffer of this subpartitionId to an appropriate tier. After the tier is * decided, the buffer will be written to the selected tier. * + * Note that the method only throws an exception when choosing a storage tier, so the caller + * should ensure that the buffer is recycled when throwing an exception. + * * @param subpartitionId the subpartition identifier * @param accumulatedBuffer one accumulated buffer of this subpartition */ private void writeAccumulatedBuffer( TieredStorageSubpartitionId subpartitionId, Buffer accumulatedBuffer) throws IOException { -// TODO, Try to write the accumulated buffer to the appropriate tier. After the tier is -// decided, then write the accumulated buffer to the tier. +Buffer compressedBuffer = compressBufferIfPossible(accumulatedBuffer); + +if (currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] == null) { +chooseStorageTierToStartSegment(subpartitionId); +} + +boolean isSuccess = + currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write( +subpartitionId, compressedBuffer); +if (!isSuccess) { +chooseStorageTierToStartSegment(subpartitionId); +isSuccess = + currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write( +subpartitionId, compressedBuffer); +checkState(isSuccess, "Failed to write the first buffer to the new segment"); +} Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214031417 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java: ## @@ -35,9 +41,21 @@ public interface TierProducerAgent { */ boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int segmentId); -/** Writes the finished {@link Buffer} to the consumer. */ -boolean write(TieredStorageSubpartitionId subpartitionId, Buffer finishedBuffer) -throws IOException; +/** + * Writes the finished {@link Buffer} to the consumer. + * + * Note that the tier must ensure that the buffer is written successfully without any + * exceptions, in order to guarantee that the buffer will be recycled. If this method throws an + * exception in the subsequent modifications, the caller should make sure that the buffer is + * recycled finally. Review Comment: Fixed. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java: ## @@ -21,9 +21,15 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; -import java.io.IOException; - -/** The producer-side agent of a Tier. */ +/** + * The producer-side agent of a Tier. + * + * Note that when writing a buffer to a tier, the {@link TierProducerAgent} should first call + * {@code tryStartNewSegment} to start a new segment. The agent can then continue writing the buffer + * to the tier as long as the return value of {@code write} is true. If the return value of {@code + * write} is false, it indicates that the current segment can no longer store the buffer, and the + * agent should try to start a new segment before writing the buffer. + */ public interface TierProducerAgent { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214031240 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ## @@ -80,13 +107,23 @@ public void write( if (isBroadcast && !isBroadcastOnly) { for (int i = 0; i < numSubpartitions; ++i) { -bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType); +// As the tiered storage subpartition ID is created only for broadcast records, +// which are fewer than normal records, the performance impact of generating new +// TieredStorageSubpartitionId objects is expected to be manageable. If the +// performance is significantly affected, this logic will be optimized accordingly. +bufferAccumulator.receive( +record.duplicate(), new TieredStorageSubpartitionId(i), dataType); } } else { bufferAccumulator.receive(record, subpartitionId, dataType); } } +public void setMetricStatisticsUpdater( +Consumer metricStatisticsUpdater) { +this.metricStatisticsUpdater = metricStatisticsUpdater; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier
TanYuxin-tyx commented on code in PR #22652: URL: https://github.com/apache/flink/pull/22652#discussion_r1214030893 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ## @@ -100,26 +137,95 @@ public void close() { */ private void writeAccumulatedBuffers( TieredStorageSubpartitionId subpartitionId, List accumulatedBuffers) { -try { -for (Buffer finishedBuffer : accumulatedBuffers) { -writeAccumulatedBuffer(subpartitionId, finishedBuffer); +Iterator bufferIterator = accumulatedBuffers.iterator(); + +int numWriteBytes = 0; +int numWriteBuffers = 0; +while (bufferIterator.hasNext()) { +Buffer buffer = bufferIterator.next(); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on pull request #22681: [FLINK-32199][runtime] Remove redundant metrics in TaskMetricStore after rescale down
wanglijie95 commented on PR #22681: URL: https://github.com/apache/flink/pull/22681#issuecomment-1573280214 @JunRuiLee This pr was successfully merged into master. But it had code conflicts when I cherry-pick it to release-1.17 and release-1.16, please solve the conflict and create PRs for this two branch, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 closed pull request #22681: [FLINK-32199][runtime] Remove redundant metrics in TaskMetricStore after rescale down
wanglijie95 closed pull request #22681: [FLINK-32199][runtime] Remove redundant metrics in TaskMetricStore after rescale down URL: https://github.com/apache/flink/pull/22681 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #22672: [BP-1.17][FLINK-32023][API / DataStream] Add config execution.buffer-…
snuyanzin commented on PR #22672: URL: https://github.com/apache/flink/pull/22672#issuecomment-1573240299 rebase to the latest commit in branch the reason of failure is https://issues.apache.org/jira/browse/FLINK-32231 which was merged recentlly to this branch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32246) javax.management.InstanceAlreadyExistsException
jeff-zou created FLINK-32246: Summary: javax.management.InstanceAlreadyExistsException Key: FLINK-32246 URL: https://issues.apache.org/jira/browse/FLINK-32246 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.15.2 Reporter: jeff-zou Flink SQL throws an exception(javax.management.InstanceAlreadyExistsException) when trying to perform multiple sink operations on the same kafka source . sql example: {code:java} create table kafka_source() with ('connector'='kafka'); insert into sink_table1 select * from kafka_source; insert into sink_table2 select * from kafka_source; {code} The Exception as below: {code:java} javax.management.InstanceAlreadyExistsException: kafka.admin.client:type=app-info,id=* java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500) org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444) org.apache.kafka.clients.admin.Admin.create(Admin.java:59) org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410) org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151) org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:209) org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406) org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Myracle commented on pull request #22672: [BP-1.17][FLINK-32023][API / DataStream] Add config execution.buffer-…
Myracle commented on PR #22672: URL: https://github.com/apache/flink/pull/22672#issuecomment-1573230300 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #604: [FLINK-32041] - Allow operator to manage leases when using watchNamespaces
gyfora commented on PR #604: URL: https://github.com/apache/flink-kubernetes-operator/pull/604#issuecomment-1573210364 @ottomata we generally try to follow a 2 month release cycle. Details can be found here: https://cwiki.apache.org/confluence/display/FLINK/Release+Schedule+and+Planning In your case I see no harm in using the helm chart from the `main` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org