[GitHub] [flink] xintongsong commented on a diff in pull request #22352: [FLINK-31639][network] Introduce tiered store memory manager
xintongsong commented on code in PR #22352: URL: https://github.com/apache/flink/pull/22352#discussion_r1199686953 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The implementation for {@link TieredStorageMemoryManager}. This is to request or recycle buffers + * from {@link LocalBufferPool} for different memory owners, for example, the tiers, the buffer + * accumulator, etc. + * + * Note that the memory owner should register its {@link TieredStorageMemorySpec} firstly before + * requesting buffers. + */ +public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager { + +/** Initial delay before checking buffer reclaim. */ +private static final int DEFAULT_CHECK_BUFFER_RECLAIM_INITIAL_DELAY_MS = 10; + +/** The period of checking buffer reclaim. */ +private static final int DEFAULT_CHECK_BUFFER_RECLAIM_PERIOD_DURATION_MS = 50; + +/** The tiered storage memory specs of each memory user owner. */ +private final Map tieredMemorySpecs; + +/** Listeners used to listen the requests for reclaiming buffer in different tiered storage. */ +private final List bufferReclaimRequestListeners; + +/** The buffer pool usage ratio of triggering the registered storages to reclaim buffers. */ +private final float numTriggerReclaimBuffersRatio; + +/** + * Indicate whether it is necessary to start a periodically checking buffer reclaim thread. If + * the memory manager is used in downstream, the field will be false because periodical buffer + * reclaim checker is needed. + */ +private final boolean needPeriodicalCheckReclaimBuffer; + +/** + * The number of requested buffers from {@link BufferPool}. Only this field can be touched both + * by the task thread and the netty thread, so it is an atomic type. + */ +private final AtomicInteger numRequestedBuffers; + +/** + * A thread to check whether to reclaim buffers from each tiered storage. + * + * Note that it is not possible to remove this, as doing so could result in the task becoming + * stuck in the buffer request. As the number of buffers in the buffer pool can vary at any + * given time, the stuck may occur if the thread is removed. + * + * For instance, if the memory usage of the {@link BufferPool} has been checked and {@link + * TieredStorageMemoryManagerImpl} determined that buffer reclamation is unnecessary, but then + * the buffer pool size is suddenly reduced to a very small size, the buffer request will become + * stuck and the task will never be able to call for buffer reclamation if this thread is + * removed, then a task stuck occurs. + */ +private ScheduledExecutorService executor; + +/** The total number of guaranteed buffers for all tiered storages. */ +private int numTotalGuaranteedBuffers; + +/** The buffer
[GitHub] [flink] pgaref commented on a diff in pull request #22608: [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure enrichment/labeling
pgaref commented on code in PR #22608: URL: https://github.com/apache/flink/pull/22608#discussion_r1199660170 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java: ## @@ -91,6 +95,31 @@ void setUp() { taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); } +@Test +void testExceptionHistoryWithGlobalFailureLabels() throws Exception { +final JobGraph jobGraph = createJobGraph(); +final TestingFailureEnricher failureEnricher = new TestingFailureEnricher(); +final SchedulerBase scheduler = +createFailingExecutionGraphScheduler( +jobGraph, +Collections.singleton(failureEnricher), +createCustomParallelismDecider(10), + BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM +.defaultValue()); +final DefaultExecutionGraph graph = (DefaultExecutionGraph) scheduler.getExecutionGraph(); +// Triggered failure on initializeJobVertex that should be labeled +try { +scheduler.startScheduling(); +} catch (IllegalStateException e) { +// expected due to failure +} Review Comment: Thanks for the suggestion! Added :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22608: [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure enrichment/labeling
reswqa commented on code in PR #22608: URL: https://github.com/apache/flink/pull/22608#discussion_r1199639985 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java: ## @@ -91,6 +95,31 @@ void setUp() { taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); } +@Test +void testExceptionHistoryWithGlobalFailureLabels() throws Exception { +final JobGraph jobGraph = createJobGraph(); +final TestingFailureEnricher failureEnricher = new TestingFailureEnricher(); +final SchedulerBase scheduler = +createFailingExecutionGraphScheduler( +jobGraph, +Collections.singleton(failureEnricher), +createCustomParallelismDecider(10), + BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM +.defaultValue()); +final DefaultExecutionGraph graph = (DefaultExecutionGraph) scheduler.getExecutionGraph(); +// Triggered failure on initializeJobVertex that should be labeled +try { +scheduler.startScheduling(); +} catch (IllegalStateException e) { +// expected due to failure +} Review Comment: How about replacing it with `Assertions.assertThatThrowBy`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31893) Introduce AdaptiveBatchScheduler failure enrichment/labeling
[ https://issues.apache.org/jira/browse/FLINK-31893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31893: --- Labels: pull-request-available (was: ) > Introduce AdaptiveBatchScheduler failure enrichment/labeling > > > Key: FLINK-31893 > URL: https://issues.apache.org/jira/browse/FLINK-31893 > Project: Flink > Issue Type: Sub-task >Reporter: Panagiotis Garefalakis >Priority: Major > Labels: pull-request-available > > See discussion: > [https://github.com/apache/flink/pull/22506#discussion_r1195412026] > This can be labeled as a global failure -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #22608: [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure enrichment/labeling
reswqa commented on code in PR #22608: URL: https://github.com/apache/flink/pull/22608#discussion_r1199639985 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java: ## @@ -91,6 +95,31 @@ void setUp() { taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); } +@Test +void testExceptionHistoryWithGlobalFailureLabels() throws Exception { +final JobGraph jobGraph = createJobGraph(); +final TestingFailureEnricher failureEnricher = new TestingFailureEnricher(); +final SchedulerBase scheduler = +createFailingExecutionGraphScheduler( +jobGraph, +Collections.singleton(failureEnricher), +createCustomParallelismDecider(10), + BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM +.defaultValue()); +final DefaultExecutionGraph graph = (DefaultExecutionGraph) scheduler.getExecutionGraph(); +// Triggered failure on initializeJobVertex that should be labeled +try { +scheduler.startScheduling(); +} catch (IllegalStateException e) { +// expected due to failure +} Review Comment: How about it replacing with `Assertions.assertThatThrowBy`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32123) Avro Confluent Schema Registry nightly end-to-end test failed due to timeout
[ https://issues.apache.org/jira/browse/FLINK-32123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724561#comment-17724561 ] Weijie Guo edited comment on FLINK-32123 at 5/20/23 5:38 PM: - Reverted in master(1.18) via f32052a12309cfe38f66344cf6d4ab39717e44c8 as INFRA-24607 was addressed. was (Author: weijie guo): Reverted in master(1.18) via f32052a12309cfe38f66344cf6d4ab39717e44c8 ad INFRA-24607 is addressed. > Avro Confluent Schema Registry nightly end-to-end test failed due to timeout > > > Key: FLINK-32123 > URL: https://issues.apache.org/jira/browse/FLINK-32123 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Panagiotis Garefalakis >Assignee: Panagiotis Garefalakis >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.18.0 > > > For the past few hours, E2E tests fail with: 'Avro Confluent Schema Registry > nightly end-to-end test' failed after 9 minutes and 53 seconds! Test exited > with exit code 1 > Looks like [https://archive.apache.org/dist/kafka/] mirror is overloaded – > download locally took more than 30min > Lets switch to [https://downloads.apache.org|https://downloads.apache.org/] > mirror > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32123) Avro Confluent Schema Registry nightly end-to-end test failed due to timeout
[ https://issues.apache.org/jira/browse/FLINK-32123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724561#comment-17724561 ] Weijie Guo edited comment on FLINK-32123 at 5/20/23 5:38 PM: - Reverted in master(1.18) via f32052a12309cfe38f66344cf6d4ab39717e44c8 ad INFRA-24607 is addressed. was (Author: weijie guo): Reverted in master(1.18) via f32052a12309cfe38f66344cf6d4ab39717e44c8. > Avro Confluent Schema Registry nightly end-to-end test failed due to timeout > > > Key: FLINK-32123 > URL: https://issues.apache.org/jira/browse/FLINK-32123 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Panagiotis Garefalakis >Assignee: Panagiotis Garefalakis >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.18.0 > > > For the past few hours, E2E tests fail with: 'Avro Confluent Schema Registry > nightly end-to-end test' failed after 9 minutes and 53 seconds! Test exited > with exit code 1 > Looks like [https://archive.apache.org/dist/kafka/] mirror is overloaded – > download locally took more than 30min > Lets switch to [https://downloads.apache.org|https://downloads.apache.org/] > mirror > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32123) Avro Confluent Schema Registry nightly end-to-end test failed due to timeout
[ https://issues.apache.org/jira/browse/FLINK-32123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724561#comment-17724561 ] Weijie Guo commented on FLINK-32123: Reverted in master(1.18) via f32052a12309cfe38f66344cf6d4ab39717e44c8. > Avro Confluent Schema Registry nightly end-to-end test failed due to timeout > > > Key: FLINK-32123 > URL: https://issues.apache.org/jira/browse/FLINK-32123 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Panagiotis Garefalakis >Assignee: Panagiotis Garefalakis >Priority: Blocker > Labels: pull-request-available, test-stability > > For the past few hours, E2E tests fail with: 'Avro Confluent Schema Registry > nightly end-to-end test' failed after 9 minutes and 53 seconds! Test exited > with exit code 1 > Looks like [https://archive.apache.org/dist/kafka/] mirror is overloaded – > download locally took more than 30min > Lets switch to [https://downloads.apache.org|https://downloads.apache.org/] > mirror > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32123) Avro Confluent Schema Registry nightly end-to-end test failed due to timeout
[ https://issues.apache.org/jira/browse/FLINK-32123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-32123. -- Fix Version/s: 1.18.0 Resolution: Fixed > Avro Confluent Schema Registry nightly end-to-end test failed due to timeout > > > Key: FLINK-32123 > URL: https://issues.apache.org/jira/browse/FLINK-32123 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Panagiotis Garefalakis >Assignee: Panagiotis Garefalakis >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.18.0 > > > For the past few hours, E2E tests fail with: 'Avro Confluent Schema Registry > nightly end-to-end test' failed after 9 minutes and 53 seconds! Test exited > with exit code 1 > Looks like [https://archive.apache.org/dist/kafka/] mirror is overloaded – > download locally took more than 30min > Lets switch to [https://downloads.apache.org|https://downloads.apache.org/] > mirror > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32123) Avro Confluent Schema Registry nightly end-to-end test failed due to timeout
[ https://issues.apache.org/jira/browse/FLINK-32123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32123: --- Labels: pull-request-available test-stability (was: test-stability) > Avro Confluent Schema Registry nightly end-to-end test failed due to timeout > > > Key: FLINK-32123 > URL: https://issues.apache.org/jira/browse/FLINK-32123 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Panagiotis Garefalakis >Assignee: Panagiotis Garefalakis >Priority: Blocker > Labels: pull-request-available, test-stability > > For the past few hours, E2E tests fail with: 'Avro Confluent Schema Registry > nightly end-to-end test' failed after 9 minutes and 53 seconds! Test exited > with exit code 1 > Looks like [https://archive.apache.org/dist/kafka/] mirror is overloaded – > download locally took more than 30min > Lets switch to [https://downloads.apache.org|https://downloads.apache.org/] > mirror > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32108) KubernetesExtension calls assumeThat in @BeforeAll callback which doesn't print the actual failure message
[ https://issues.apache.org/jira/browse/FLINK-32108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32108: --- Labels: pull-request-available starter (was: starter) > KubernetesExtension calls assumeThat in @BeforeAll callback which doesn't > print the actual failure message > -- > > Key: FLINK-32108 > URL: https://issues.apache.org/jira/browse/FLINK-32108 > Project: Flink > Issue Type: Technical Debt > Components: Test Infrastructure >Affects Versions: 1.18.0 >Reporter: Matthias Pohl >Assignee: Weijie Guo >Priority: Minor > Labels: pull-request-available, starter > > {{KubernetesExtension}} implements {{BeforeAllCallback}} which calls the > {{assumeThat}} in the {{@BeforeAll}} context. {{assumeThat}} doesn't work > properly in the {{@BeforeAll}} context, though: The error message is not > printed and the test fails with exit code -1. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22613: [FLINK-32141][checkpointing] turn down a log level in the SharedState…
flinkbot commented on PR #22613: URL: https://github.com/apache/flink/pull/22613#issuecomment-1555953469 ## CI report: * 97ae387dfe466d2d1e32f10bfd4f62038e0b2ee4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32141) SharedStateRegistry print too much info log
[ https://issues.apache.org/jira/browse/FLINK-32141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32141: --- Labels: pull-request-available (was: ) > SharedStateRegistry print too much info log > --- > > Key: FLINK-32141 > URL: https://issues.apache.org/jira/browse/FLINK-32141 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.17.0 >Reporter: Feifan Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.17.1 > > Attachments: image-2023-05-21-00-26-20-026.png > > > FLINK-29095 added some log to SharedStateRegistry for trouble shooting. Among > them, a info log be added when newHandle is equal to the registered one: > [https://github.com/apache/flink/blob/release-1.17.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java#L117] > !image-2023-05-21-00-26-20-026.png|width=775,height=126! > But this case cannot be considered as a potential bug, because > FsStateChangelogStorage will directly use the FileStateHandle of the previous > checkpoint instead of PlaceholderStreamStateHandle. > In our tests, JobManager printed so much of this log that useful information > was overwhelmed. > So I suggest change this log level to trace, WDYT [~Yanfei Lei], [~klion26] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32141) SharedStateRegistry print too much info log
[ https://issues.apache.org/jira/browse/FLINK-32141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feifan Wang updated FLINK-32141: Description: FLINK-29095 added some log to SharedStateRegistry for trouble shooting. Among them, a info log be added when newHandle is equal to the registered one: [https://github.com/apache/flink/blob/release-1.17.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java#L117] !image-2023-05-21-00-26-20-026.png|width=775,height=126! But this case cannot be considered as a potential bug, because FsStateChangelogStorage will directly use the FileStateHandle of the previous checkpoint instead of PlaceholderStreamStateHandle. In our tests, JobManager printed so much of this log that useful information was overwhelmed. So I suggest change this log level to trace, WDYT [~Yanfei Lei], [~klion26] ? was: FLINK-29095 added some log to SharedStateRegistry for trouble shooting. Among them, a info log be added when newHandle is equal to the registered one: [https://github.com/apache/flink/blob/release-1.17.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java#L117] !image-2023-05-21-00-26-20-026.png|width=775,height=126! But this case cannot be considered as a possible bug, because FsStateChangelogStorage will directly use the FileStateHandle of the previous checkpoint instead of PlaceholderStreamStateHandle. In our tests, JobManager printed so much of this log that useful information was overwhelmed. So I suggest change this log level to trace, WDYT [~Yanfei Lei], [~klion26] ? > SharedStateRegistry print too much info log > --- > > Key: FLINK-32141 > URL: https://issues.apache.org/jira/browse/FLINK-32141 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.17.0 >Reporter: Feifan Wang >Priority: Major > Fix For: 1.17.1 > > Attachments: image-2023-05-21-00-26-20-026.png > > > FLINK-29095 added some log to SharedStateRegistry for trouble shooting. Among > them, a info log be added when newHandle is equal to the registered one: > [https://github.com/apache/flink/blob/release-1.17.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java#L117] > !image-2023-05-21-00-26-20-026.png|width=775,height=126! > But this case cannot be considered as a potential bug, because > FsStateChangelogStorage will directly use the FileStateHandle of the previous > checkpoint instead of PlaceholderStreamStateHandle. > In our tests, JobManager printed so much of this log that useful information > was overwhelmed. > So I suggest change this log level to trace, WDYT [~Yanfei Lei], [~klion26] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32141) SharedStateRegistry print too much info log
Feifan Wang created FLINK-32141: --- Summary: SharedStateRegistry print too much info log Key: FLINK-32141 URL: https://issues.apache.org/jira/browse/FLINK-32141 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.17.0 Reporter: Feifan Wang Fix For: 1.17.1 Attachments: image-2023-05-21-00-26-20-026.png FLINK-29095 added some log to SharedStateRegistry for trouble shooting. Among them, a info log be added when newHandle is equal to the registered one: [https://github.com/apache/flink/blob/release-1.17.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java#L117] !image-2023-05-21-00-26-20-026.png|width=775,height=126! But this case cannot be considered as a possible bug, because FsStateChangelogStorage will directly use the FileStateHandle of the previous checkpoint instead of PlaceholderStreamStateHandle. In our tests, JobManager printed so much of this log that useful information was overwhelmed. So I suggest change this log level to trace, WDYT [~Yanfei Lei], [~klion26] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32139) Data accidentally deleted and not deleted when upsert sink to hbase
[ https://issues.apache.org/jira/browse/FLINK-32139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724543#comment-17724543 ] LiuZeshan commented on FLINK-32139: --- [~ferenc-csaky] [~martijnvisser] Could you help to take a look at this issue. cc [~mgergely] [~Leonard] [~jark] > Data accidentally deleted and not deleted when upsert sink to hbase > --- > > Key: FLINK-32139 > URL: https://issues.apache.org/jira/browse/FLINK-32139 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: LiuZeshan >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > h4. *Problem background* > We meet data accidental deletion and non deletion issues when synchronizing > MySQL to HBase using MySQL-CDC and HBase connectors. > h3. Reproduction steps > 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. > SinkMaterializer is tunned off by setting > {{{}table.exec.sink.upsert-materialize = 'NONE'{}}}。 > MySQL table schema is as follows。 > {code:java} > CREATE TABLE `source_sample_1001` ( > `id` int(11) NOT NULL AUTO_INCREMENT, > `name` varchar(200) DEFAULT NULL, > `age` int(11) DEFAULT NULL, > `weight` float DEFAULT NULL, > PRIMARY KEY (`id`) > );{code} > The source table definition in Flink is as follows. > {code:java} > CREATE TABLE `source_sample_1001` ( > `id` bigint, > `name` String, > `age` bigint, > `weight` float, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc' , > 'hostname' = '${ip}', > 'port' = '3306', > 'username' = '${user}', > 'password' = '${password}', > 'database-name' = 'testdb_0010', > 'table-name' = 'source_sample_1001' > );{code} > HBase sink table are created in {{testdb_0011}} namespace. > {code:java} > CREATE 'testdb_0011:source_sample_1001', 'data' > > describe 'testdb_0011:source_sample_1001' > > # describe output > Table testdb_0011:source_sample_1001 is ENABLED > > > testdb_0011:source_sample_1001 > > > COLUMN FAMILIES DESCRIPTION > > {NAME => 'data', BLOOMFILTER => > 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', > DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', > MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', > REPLICATION_SCOPE => '0'} > {code} > > > > The sink table definition in Flink. > {code:java} > CREATE TABLE `hbase_sink1` ( > `id` STRING COMMENT 'unique id', > `data` ROW< > `name` string, > `age` bigint, > `weight` float > >, > primary key(`id`) not enforced > ) WITH ( > 'connector' = 'hbase-2.2', > 'table-name' = 'testdb_0011:source_sample_1001', > 'zookeeper.quorum' = '${hbase.zookeeper.quorum}' > );{code} > DML in flink to synchronize data. > {code:java} > INSERT INTO `hbase_sink1` SELECT > REVERSE(CONCAT_WS('', CAST(`id` AS VARCHAR))) as `id`, > ROW(`name`, `age`, `weight`) > FROM `source_sample_1001`;{code} > 2、Another flink job sink datagen data to the MySQL table > {{source_sample_1001}} 。id range from 1 to 10_000, that means > source_sample_1001 will have at most 10_000 records。 > {code:java} > CREATE TABLE datagen_source ( > `id` int, > `name` String, > `age` int, > `weight` float > ) WITH ( > 'connector' = 'datagen', > 'fields.id.kind' = 'random', > 'fields.id.min' = '1', > 'fields.id.max' = '1', > 'fields.name.length' = '20', > 'rows-per-second' = '5000' > ); > > CREATE TABLE `source_sample_1001` ( > `id` bigint, > `name` String, > `age` bigint, > `weight` float, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true=Asia/Shanghai', > 'table-name' = 'source_sample_1001', > 'username' = '${user}', > 'password' = '${password}', > 'sink.buffer-flush.max-rows' = '500', > 'sink.buffer-flush.interval' = '1s' > ); > > -- dml > INSERT INTO `source_sample_1001` SELECT * FROM `datagen_source`; > {code} > 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10. > {code:java} > #!/bin/bash > > mysql1="mysql -h${ip}
[GitHub] [flink] flinkbot commented on pull request #22612: [FLINK-32139][connector/hbase] Using strongly increasing nanosecond timestamp and DeleteColumn type to fix data accidental deletion and non
flinkbot commented on PR #22612: URL: https://github.com/apache/flink/pull/22612#issuecomment-1555933957 ## CI report: * a2341810a244b97a3af32951e17efbc49f570cdd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32139) Data accidentally deleted and not deleted when upsert sink to hbase
[ https://issues.apache.org/jira/browse/FLINK-32139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32139: --- Labels: pull-request-available (was: ) > Data accidentally deleted and not deleted when upsert sink to hbase > --- > > Key: FLINK-32139 > URL: https://issues.apache.org/jira/browse/FLINK-32139 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: LiuZeshan >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > h4. *Problem background* > We meet data accidental deletion and non deletion issues when synchronizing > MySQL to HBase using MySQL-CDC and HBase connectors. > h3. Reproduction steps > 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. > SinkMaterializer is tunned off by setting > {{{}table.exec.sink.upsert-materialize = 'NONE'{}}}。 > MySQL table schema is as follows。 > {code:java} > CREATE TABLE `source_sample_1001` ( > `id` int(11) NOT NULL AUTO_INCREMENT, > `name` varchar(200) DEFAULT NULL, > `age` int(11) DEFAULT NULL, > `weight` float DEFAULT NULL, > PRIMARY KEY (`id`) > );{code} > The source table definition in Flink is as follows. > {code:java} > CREATE TABLE `source_sample_1001` ( > `id` bigint, > `name` String, > `age` bigint, > `weight` float, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc' , > 'hostname' = '${ip}', > 'port' = '3306', > 'username' = '${user}', > 'password' = '${password}', > 'database-name' = 'testdb_0010', > 'table-name' = 'source_sample_1001' > );{code} > HBase sink table are created in {{testdb_0011}} namespace. > {code:java} > CREATE 'testdb_0011:source_sample_1001', 'data' > > describe 'testdb_0011:source_sample_1001' > > # describe output > Table testdb_0011:source_sample_1001 is ENABLED > > > testdb_0011:source_sample_1001 > > > COLUMN FAMILIES DESCRIPTION > > {NAME => 'data', BLOOMFILTER => > 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', > DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', > MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', > REPLICATION_SCOPE => '0'} > {code} > > > > The sink table definition in Flink. > {code:java} > CREATE TABLE `hbase_sink1` ( > `id` STRING COMMENT 'unique id', > `data` ROW< > `name` string, > `age` bigint, > `weight` float > >, > primary key(`id`) not enforced > ) WITH ( > 'connector' = 'hbase-2.2', > 'table-name' = 'testdb_0011:source_sample_1001', > 'zookeeper.quorum' = '${hbase.zookeeper.quorum}' > );{code} > DML in flink to synchronize data. > {code:java} > INSERT INTO `hbase_sink1` SELECT > REVERSE(CONCAT_WS('', CAST(`id` AS VARCHAR))) as `id`, > ROW(`name`, `age`, `weight`) > FROM `source_sample_1001`;{code} > 2、Another flink job sink datagen data to the MySQL table > {{source_sample_1001}} 。id range from 1 to 10_000, that means > source_sample_1001 will have at most 10_000 records。 > {code:java} > CREATE TABLE datagen_source ( > `id` int, > `name` String, > `age` int, > `weight` float > ) WITH ( > 'connector' = 'datagen', > 'fields.id.kind' = 'random', > 'fields.id.min' = '1', > 'fields.id.max' = '1', > 'fields.name.length' = '20', > 'rows-per-second' = '5000' > ); > > CREATE TABLE `source_sample_1001` ( > `id` bigint, > `name` String, > `age` bigint, > `weight` float, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true=Asia/Shanghai', > 'table-name' = 'source_sample_1001', > 'username' = '${user}', > 'password' = '${password}', > 'sink.buffer-flush.max-rows' = '500', > 'sink.buffer-flush.interval' = '1s' > ); > > -- dml > INSERT INTO `source_sample_1001` SELECT * FROM `datagen_source`; > {code} > 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10. > {code:java} > #!/bin/bash > > mysql1="mysql -h${ip} -u${user} -p${password}" > batch=10 > > for ((i=1; ;i++)); do > echo "iteration $i start" > for
[GitHub] [flink] lzshlzsh opened a new pull request, #22612: [FLINK-32139][connector/hbase] Using strongly increasing nanosecond timestamp and DeleteColumn type to fix data accidental deletion and non
lzshlzsh opened a new pull request, #22612: URL: https://github.com/apache/flink/pull/22612 ## What is the purpose of the change Using strongly increasing nanosecond timestamp and DeleteColumn type to fix data accidental deletion and non deletion issues when sink upsert data to hbase. #32139 ## Brief change log - Add a class HBaseTimestampGenerator which generates strongly increasing timestamp in nanoseconds to set to hbase mutation. - HBaseSerde createPutMutation and createDeleteMutation use muation with timestamp. - delete.addColumns instead of delete.addColumn, the first hbase client api use [DeleteColumn key type](https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L322) and the second use [Delete key type](https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L380). ## Verifying this change - Added test that validates that HBaseTimestampGenerator generates strongly increasing timestamp. - Online test described in #32139 ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32140) Data accidentally deleted and not deleted when upsert sink to hbase
[ https://issues.apache.org/jira/browse/FLINK-32140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LiuZeshan closed FLINK-32140. - Resolution: Duplicate > Data accidentally deleted and not deleted when upsert sink to hbase > --- > > Key: FLINK-32140 > URL: https://issues.apache.org/jira/browse/FLINK-32140 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: LiuZeshan >Priority: Major > > h4. *Problem background* > We meet data accidental deletion and non deletion issues when synchronizing > MySQL to HBase using MySQL-CDC and HBase connectors. > h3. Reproduction steps > 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. > SinkMaterializer is tunned off by setting > {{table.exec.sink.upsert-materialize = 'NONE'}}。 > MySQL table schema is as follows。 > CREATE TABLE `source_sample_1001` ( > `id` int(11) NOT NULL AUTO_INCREMENT, > `name` varchar(200) DEFAULT NULL, > `age` int(11) DEFAULT NULL, > `weight` float DEFAULT NULL, > PRIMARY KEY (`id`) > ); > The source table definition in Flink is as follows. > CREATE TABLE `source_sample_1001` ( > `id` bigint, > `name` String, > `age` bigint, > `weight` float, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc' , > 'hostname' = '${ip}', > 'port' = '3306', > 'username' = '${user}', > 'password' = '${password}', > 'database-name' = 'testdb_0010', > 'table-name' = 'source_sample_1001' > ); > HBase sink table are created in {{testdb_0011}} namespace. > CREATE 'testdb_0011:source_sample_1001', 'data' > > describe 'testdb_0011:source_sample_1001' > # describe output > Table testdb_0011:source_sample_1001 is ENABLED > > > testdb_0011:source_sample_1001 > > > COLUMN FAMILIES DESCRIPTION > > > {NAME => 'data', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', VERSIONS => '1', > KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => > 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0' > , BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} > > > The sink table definition in Flink. > CREATE TABLE `hbase_sink1` ( > `id` STRING COMMENT 'unique id', > `data` ROW< > `name` string, > `age` bigint, > `weight` float > >, > primary key(`id`) not enforced > ) WITH ( > 'connector' = 'hbase-2.2', > 'table-name' = 'testdb_0011:source_sample_1001', > 'zookeeper.quorum' = '${hbase.zookeeper.quorum}' > ); > DML in flink to synchronize data. > INSERT INTO `hbase_sink1` SELECT > REVERSE(CONCAT_WS('', CAST(`id` AS VARCHAR))) as `id`, > ROW(`name`, `age`, `weight`) > FROM `source_sample_1001`; > 2、Another flink job sink datagen data to the MySQL table > {{source_sample_1001}} 。id range from 1 to 10_000, that means > source_sample_1001 will have at most 10_000 records。 > CREATE TABLE datagen_source ( > `id` int, > `name` String, > `age` int, > `weight` float > ) WITH ( > 'connector' = 'datagen', > 'fields.id.kind' = 'random', > 'fields.id.min' = '1', > 'fields.id.max' = '1', > 'fields.name.length' = '20', > 'rows-per-second' = '5000' > ); > > CREATE TABLE `source_sample_1001` ( > `id` bigint, > `name` String, > `age` bigint, > `weight` float, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true=Asia/Shanghai', > 'table-name' = 'source_sample_1001', > 'username' = '${user}', > 'password' = '${password}', > 'sink.buffer-flush.max-rows' = '500', > 'sink.buffer-flush.interval' = '1s' > ); > > -- dml > INSERT INTO `source_sample_1001` SELECT * FROM `datagen_source`; > 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10. > #!/bin/bash > > mysql1="mysql -h${ip} -u${user} -p${password}" > batch=10 > > for ((i=1; ;i++)); do > echo "iteration $i start" > for ((j=1; j<=1; j+=10)); do > $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and > id < $((j+10))" > done > echo "iteration $i end" > sleep 10 > done >
[jira] [Created] (FLINK-32140) Data accidentally deleted and not deleted when upsert sink to hbase
LiuZeshan created FLINK-32140: - Summary: Data accidentally deleted and not deleted when upsert sink to hbase Key: FLINK-32140 URL: https://issues.apache.org/jira/browse/FLINK-32140 Project: Flink Issue Type: Bug Components: Connectors / HBase Reporter: LiuZeshan h4. *Problem background* We meet data accidental deletion and non deletion issues when synchronizing MySQL to HBase using MySQL-CDC and HBase connectors. h3. Reproduction steps 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. SinkMaterializer is tunned off by setting {{table.exec.sink.upsert-materialize = 'NONE'}}。 MySQL table schema is as follows。 CREATE TABLE `source_sample_1001` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(200) DEFAULT NULL, `age` int(11) DEFAULT NULL, `weight` float DEFAULT NULL, PRIMARY KEY (`id`) ); The source table definition in Flink is as follows. CREATE TABLE `source_sample_1001` ( `id` bigint, `name` String, `age` bigint, `weight` float, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc' , 'hostname' = '${ip}', 'port' = '3306', 'username' = '${user}', 'password' = '${password}', 'database-name' = 'testdb_0010', 'table-name' = 'source_sample_1001' ); HBase sink table are created in {{testdb_0011}} namespace. CREATE 'testdb_0011:source_sample_1001', 'data' describe 'testdb_0011:source_sample_1001' # describe output Table testdb_0011:source_sample_1001 is ENABLED testdb_0011:source_sample_1001 COLUMN FAMILIES DESCRIPTION {NAME => 'data', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} The sink table definition in Flink. CREATE TABLE `hbase_sink1` ( `id` STRING COMMENT 'unique id', `data` ROW< `name` string, `age` bigint, `weight` float >, primary key(`id`) not enforced ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'testdb_0011:source_sample_1001', 'zookeeper.quorum' = '${hbase.zookeeper.quorum}' ); DML in flink to synchronize data. INSERT INTO `hbase_sink1` SELECT REVERSE(CONCAT_WS('', CAST(`id` AS VARCHAR))) as `id`, ROW(`name`, `age`, `weight`) FROM `source_sample_1001`; 2、Another flink job sink datagen data to the MySQL table {{source_sample_1001}} 。id range from 1 to 10_000, that means source_sample_1001 will have at most 10_000 records。 CREATE TABLE datagen_source ( `id` int, `name` String, `age` int, `weight` float ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'random', 'fields.id.min' = '1', 'fields.id.max' = '1', 'fields.name.length' = '20', 'rows-per-second' = '5000' ); CREATE TABLE `source_sample_1001` ( `id` bigint, `name` String, `age` bigint, `weight` float, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true=Asia/Shanghai', 'table-name' = 'source_sample_1001', 'username' = '${user}', 'password' = '${password}', 'sink.buffer-flush.max-rows' = '500', 'sink.buffer-flush.interval' = '1s' ); -- dml INSERT INTO `source_sample_1001` SELECT * FROM `datagen_source`; 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10. #!/bin/bash mysql1="mysql -h${ip} -u${user} -p${password}" batch=10 for ((i=1; ;i++)); do echo "iteration $i start" for ((j=1; j<=1; j+=10)); do $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and id < $((j+10))" done echo "iteration $i end" sleep 10 done 4、Start the above two flink jobs and the bash script. Wait for several minutes, usually 5 minutes is enough. Please note that deleting data bash script is necessary for reproduce the problem. 5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 data by the datagen flink job。And then stop datagen flink job. Waiting for the sink hbase job to read all the binlog of MySQL table {{source_sample_1001}}. 6、Check the hbase table and reproduce the issue of
[jira] [Created] (FLINK-32139) Data accidentally deleted and not deleted when upsert sink to hbase
LiuZeshan created FLINK-32139: - Summary: Data accidentally deleted and not deleted when upsert sink to hbase Key: FLINK-32139 URL: https://issues.apache.org/jira/browse/FLINK-32139 Project: Flink Issue Type: Bug Components: Connectors / HBase Reporter: LiuZeshan h4. *Problem background* We meet data accidental deletion and non deletion issues when synchronizing MySQL to HBase using MySQL-CDC and HBase connectors. h3. Reproduction steps 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. SinkMaterializer is tunned off by setting {{table.exec.sink.upsert-materialize = 'NONE'}}。 MySQL table schema is as follows。 CREATE TABLE `source_sample_1001` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(200) DEFAULT NULL, `age` int(11) DEFAULT NULL, `weight` float DEFAULT NULL, PRIMARY KEY (`id`) ); The source table definition in Flink is as follows. CREATE TABLE `source_sample_1001` ( `id` bigint, `name` String, `age` bigint, `weight` float, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc' , 'hostname' = '${ip}', 'port' = '3306', 'username' = '${user}', 'password' = '${password}', 'database-name' = 'testdb_0010', 'table-name' = 'source_sample_1001' ); HBase sink table are created in {{testdb_0011}} namespace. CREATE 'testdb_0011:source_sample_1001', 'data' describe 'testdb_0011:source_sample_1001' # describe output Table testdb_0011:source_sample_1001 is ENABLED testdb_0011:source_sample_1001 COLUMN FAMILIES DESCRIPTION {NAME => 'data', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} The sink table definition in Flink. CREATE TABLE `hbase_sink1` ( `id` STRING COMMENT 'unique id', `data` ROW< `name` string, `age` bigint, `weight` float >, primary key(`id`) not enforced ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'testdb_0011:source_sample_1001', 'zookeeper.quorum' = '${hbase.zookeeper.quorum}' ); DML in flink to synchronize data. INSERT INTO `hbase_sink1` SELECT REVERSE(CONCAT_WS('', CAST(`id` AS VARCHAR))) as `id`, ROW(`name`, `age`, `weight`) FROM `source_sample_1001`; 2、Another flink job sink datagen data to the MySQL table {{source_sample_1001}} 。id range from 1 to 10_000, that means source_sample_1001 will have at most 10_000 records。 CREATE TABLE datagen_source ( `id` int, `name` String, `age` int, `weight` float ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'random', 'fields.id.min' = '1', 'fields.id.max' = '1', 'fields.name.length' = '20', 'rows-per-second' = '5000' ); CREATE TABLE `source_sample_1001` ( `id` bigint, `name` String, `age` bigint, `weight` float, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true=Asia/Shanghai', 'table-name' = 'source_sample_1001', 'username' = '${user}', 'password' = '${password}', 'sink.buffer-flush.max-rows' = '500', 'sink.buffer-flush.interval' = '1s' ); -- dml INSERT INTO `source_sample_1001` SELECT * FROM `datagen_source`; 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10. #!/bin/bash mysql1="mysql -h${ip} -u${user} -p${password}" batch=10 for ((i=1; ;i++)); do echo "iteration $i start" for ((j=1; j<=1; j+=10)); do $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and id < $((j+10))" done echo "iteration $i end" sleep 10 done 4、Start the above two flink jobs and the bash script. Wait for several minutes, usually 5 minutes is enough. Please note that deleting data bash script is necessary for reproduce the problem. 5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 data by the datagen flink job。And then stop datagen flink job. Waiting for the sink hbase job to read all the binlog of MySQL table {{source_sample_1001}}. 6、Check the hbase table and reproduce the issue of
[GitHub] [flink] flinkbot commented on pull request #22611: [hotfix][doc] mvn 3.8.6 is recommended or higher version could be used
flinkbot commented on PR #22611: URL: https://github.com/apache/flink/pull/22611#issuecomment-1555903469 ## CI report: * 286348435be40dec54af80f156cb989833a375f0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingGe opened a new pull request, #22611: [hotfix][doc] mvn 3.8.6 is recommended or higher version could be used
JingGe opened a new pull request, #22611: URL: https://github.com/apache/flink/pull/22611 ## What is the purpose of the change update the doc to help developer use the feasible maven version. ## Brief change log - pointed out that 3.8.6 is recommended according to https://lists.apache.org/thread/jbw3lzzoq5w16ckco3fc9xokycs3f22x. - Higher version could also be used. I personally test with 3.9.1 ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (FLINK-25989) EventTimeWindowCheckpointingITCase failed with exit code 137
[ https://issues.apache.org/jira/browse/FLINK-25989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reopened FLINK-25989: - reopen since it seems it happened again https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49085=logs=b0a398c0-685b-599c-eb57-c8c2a771138e=747432ad-a576-5911-1e2a-68c6bedc248a=11252 > EventTimeWindowCheckpointingITCase failed with exit code 137 > > > Key: FLINK-25989 > URL: https://issues.apache.org/jira/browse/FLINK-25989 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.13.5 >Reporter: Yun Gao >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > {code:java} > Feb 07 10:35:00 Starting > org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase#testSlidingTimeWindow[statebackend > type =ROCKSDB_FULLY_ASYNC]. > ##[error]Exit code 137 returned from process: file name '/bin/docker', > arguments 'exec -i -u 1006 -w /home/agent07_azpcontainer > cc88843b38dcb3e53e10065d2a740015cf3fd387e5ef47621c224a9b348fbb37 > /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'. > Finishing: Test - tests > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30833=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=f508e270-48d6-5f1e-3138-42a17e0714f0=4495 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-18356) flink-table-planner Exit code 137 returned from process
[ https://issues.apache.org/jira/browse/FLINK-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-18356: Affects Version/s: 1.18.0 > flink-table-planner Exit code 137 returned from process > --- > > Key: FLINK-18356 > URL: https://issues.apache.org/jira/browse/FLINK-18356 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Tests >Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0 >Reporter: Piotr Nowojski >Priority: Critical > Labels: pull-request-available, test-stability > Attachments: 1234.jpg, app-profiling_4.gif, > image-2023-01-11-22-21-57-784.png, image-2023-01-11-22-22-32-124.png, > image-2023-02-16-20-18-09-431.png > > > {noformat} > = test session starts > == > platform linux -- Python 3.7.3, pytest-5.4.3, py-1.8.2, pluggy-0.13.1 > cachedir: .tox/py37-cython/.pytest_cache > rootdir: /__w/3/s/flink-python > collected 568 items > pyflink/common/tests/test_configuration.py ..[ > 1%] > pyflink/common/tests/test_execution_config.py ...[ > 5%] > pyflink/dataset/tests/test_execution_environment.py . > ##[error]Exit code 137 returned from process: file name '/bin/docker', > arguments 'exec -i -u 1002 > 97fc4e22522d2ced1f4d23096b8929045d083dd0a99a4233a8b20d0489e9bddb > /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'. > Finishing: Test - python > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3729=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18356) flink-table-planner Exit code 137 returned from process
[ https://issues.apache.org/jira/browse/FLINK-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724502#comment-17724502 ] Sergey Nuyanzin commented on FLINK-18356: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49084=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=12462 > flink-table-planner Exit code 137 returned from process > --- > > Key: FLINK-18356 > URL: https://issues.apache.org/jira/browse/FLINK-18356 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Tests >Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0 >Reporter: Piotr Nowojski >Priority: Critical > Labels: pull-request-available, test-stability > Attachments: 1234.jpg, app-profiling_4.gif, > image-2023-01-11-22-21-57-784.png, image-2023-01-11-22-22-32-124.png, > image-2023-02-16-20-18-09-431.png > > > {noformat} > = test session starts > == > platform linux -- Python 3.7.3, pytest-5.4.3, py-1.8.2, pluggy-0.13.1 > cachedir: .tox/py37-cython/.pytest_cache > rootdir: /__w/3/s/flink-python > collected 568 items > pyflink/common/tests/test_configuration.py ..[ > 1%] > pyflink/common/tests/test_execution_config.py ...[ > 5%] > pyflink/dataset/tests/test_execution_environment.py . > ##[error]Exit code 137 returned from process: file name '/bin/docker', > arguments 'exec -i -u 1002 > 97fc4e22522d2ced1f4d23096b8929045d083dd0a99a4233a8b20d0489e9bddb > /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'. > Finishing: Test - python > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3729=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26974) Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724501#comment-17724501 ] Sergey Nuyanzin commented on FLINK-26974: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49123=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=28496 > Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure > - > > Key: FLINK-26974 > URL: https://issues.apache.org/jira/browse/FLINK-26974 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yun Gao >Assignee: Huang Xingbo >Priority: Critical > Labels: auto-deprioritized-major, test-stability > > {code:java} > Mar 31 10:49:17 === FAILURES > === > Mar 31 10:49:17 __ > EmbeddedThreadDependencyTests.test_add_python_file __ > Mar 31 10:49:17 > Mar 31 10:49:17 self = > testMethod=test_add_python_file> > Mar 31 10:49:17 > Mar 31 10:49:17 def test_add_python_file(self): > Mar 31 10:49:17 python_file_dir = os.path.join(self.tempdir, > "python_file_dir_" + str(uuid.uuid4())) > Mar 31 10:49:17 os.mkdir(python_file_dir) > Mar 31 10:49:17 python_file_path = os.path.join(python_file_dir, > "test_dependency_manage_lib.py") > Mar 31 10:49:17 with open(python_file_path, 'w') as f: > Mar 31 10:49:17 f.write("def add_two(a):\nraise > Exception('This function should not be called!')") > Mar 31 10:49:17 self.t_env.add_python_file(python_file_path) > Mar 31 10:49:17 > Mar 31 10:49:17 python_file_dir_with_higher_priority = os.path.join( > Mar 31 10:49:17 self.tempdir, "python_file_dir_" + > str(uuid.uuid4())) > Mar 31 10:49:17 os.mkdir(python_file_dir_with_higher_priority) > Mar 31 10:49:17 python_file_path_higher_priority = > os.path.join(python_file_dir_with_higher_priority, > Mar 31 10:49:17 > "test_dependency_manage_lib.py") > Mar 31 10:49:17 with open(python_file_path_higher_priority, 'w') as f: > Mar 31 10:49:17 f.write("def add_two(a):\nreturn a + 2") > Mar 31 10:49:17 > self.t_env.add_python_file(python_file_path_higher_priority) > Mar 31 10:49:17 > Mar 31 10:49:17 def plus_two(i): > Mar 31 10:49:17 from test_dependency_manage_lib import add_two > Mar 31 10:49:17 return add_two(i) > Mar 31 10:49:17 > Mar 31 10:49:17 self.t_env.create_temporary_system_function( > Mar 31 10:49:17 "add_two", udf(plus_two, DataTypes.BIGINT(), > DataTypes.BIGINT())) > Mar 31 10:49:17 table_sink = source_sink_utils.TestAppendSink( > Mar 31 10:49:17 ['a', 'b'], [DataTypes.BIGINT(), > DataTypes.BIGINT()]) > Mar 31 10:49:17 self.t_env.register_table_sink("Results", table_sink) > Mar 31 10:49:17 t = self.t_env.from_elements([(1, 2), (2, 5), (3, > 1)], ['a', 'b']) > Mar 31 10:49:17 > t.select(expr.call("add_two", t.a), > t.a).execute_insert("Results").wait() > Mar 31 10:49:17 > Mar 31 10:49:17 pyflink/table/tests/test_dependency.py:63: > Mar 31 10:49:17 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ > Mar 31 10:49:17 pyflink/table/table_result.py:76: in wait > Mar 31 10:49:17 get_method(self._j_table_result, "await")() > Mar 31 10:49:17 > .tox/py38-cython/lib/python3.8/site-packages/py4j/java_gateway.py:1321: in > __call__ > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34001=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=27239 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30433) HiveDialectITCase#testCreateDatabase fails with "Could not execute CREATE DATABASE"
[ https://issues.apache.org/jira/browse/FLINK-30433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724500#comment-17724500 ] Sergey Nuyanzin commented on FLINK-30433: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49124=logs=245e1f2e-ba5b-5570-d689-25ae21e5302f=d04c9862-880c-52f5-574b-a7a79fef8e0f=22404 > HiveDialectITCase#testCreateDatabase fails with "Could not execute CREATE > DATABASE" > --- > > Key: FLINK-30433 > URL: https://issues.apache.org/jira/browse/FLINK-30433 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.17.0, 1.16.1 >Reporter: Martijn Visser >Priority: Critical > Labels: test-stability > > {code:java} > Feb 14 02:38:16 [ERROR] Tests run: 27, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 341.271 s <<< FAILURE! - in > org.apache.flink.connectors.hive.HiveDialectITCase > Feb 14 02:38:16 [ERROR] > org.apache.flink.connectors.hive.HiveDialectITCase.testTemporaryFunctionUDTF > Time elapsed: 49.997 s <<< ERROR! > Feb 14 02:38:16 org.apache.flink.table.api.TableException: Could not execute > CREATE DATABASE: (catalogDatabase: [{}], catalogName: [test-catalog], > databaseName: [db1], ignoreIfExists: [false]) > Feb 14 02:38:16 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1125) > Feb 14 02:38:16 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > Feb 14 02:38:16 at > org.apache.flink.connectors.hive.HiveDialectITCase.testTemporaryFunctionUDTF(HiveDialectITCase.java:838) > Feb 14 02:38:16 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Feb 14 02:38:16 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Feb 14 02:38:16 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Feb 14 02:38:16 at java.lang.reflect.Method.invoke(Method.java:498) > [...] > Feb 14 02:38:16 at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548) > Feb 14 02:38:16 Caused by: > org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create > database db1 > Feb 14 02:38:16 at > org.apache.flink.table.catalog.hive.HiveCatalog.createDatabase(HiveCatalog.java:376) > Feb 14 02:38:16 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1117) > Feb 14 02:38:16 ... 47 more > Feb 14 02:38:16 Caused by: InvalidObjectException(message:No such catalog > hive) > Feb 14 02:38:16 at > org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database_core(HiveMetaStore.java:1251) > Feb 14 02:38:16 at > org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:1325) > Feb 14 02:38:16 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Feb 14 02:38:16 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Feb 14 02:38:16 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Feb 14 02:38:16 at java.lang.reflect.Method.invoke(Method.java:498) > Feb 14 02:38:16 at > org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147) > Feb 14 02:38:16 at > org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108) > Feb 14 02:38:16 at com.sun.proxy.$Proxy35.create_database(Unknown > Source) > Feb 14 02:38:16 at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:809) > Feb 14 02:38:16 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Feb 14 02:38:16 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Feb 14 02:38:16 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Feb 14 02:38:16 at java.lang.reflect.Method.invoke(Method.java:498) > Feb 14 02:38:16 at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208) > Feb 14 02:38:16 at com.sun.proxy.$Proxy36.createDatabase(Unknown Source) > Feb 14 02:38:16 at > org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createDatabase(HiveMetastoreClientWrapper.java:179) > Feb 14 02:38:16 at > org.apache.flink.table.catalog.hive.HiveCatalog.createDatabase(HiveCatalog.java:369) > Feb 14 02:38:16 ... 48 more{code} >
[jira] [Commented] (FLINK-26990) BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime failed
[ https://issues.apache.org/jira/browse/FLINK-26990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724498#comment-17724498 ] Sergey Nuyanzin commented on FLINK-26990: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49143=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199=26029 > BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > failed > --- > > Key: FLINK-26990 > URL: https://issues.apache.org/jira/browse/FLINK-26990 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Matthias Pohl >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > [This > build|https://dev.azure.com/mapohl/flink/_build/results?buildId=914=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d=25899] > failed due to unexpected behavior in > {{BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime}}: > {code} > Apr 01 11:42:06 [ERROR] > org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > Time elapsed: 0.11 s <<< FAILURE! > Apr 01 11:42:06 java.lang.AssertionError: > Apr 01 11:42:06 > Apr 01 11:42:06 Expected size: 6 but was: 4 in: > Apr 01 11:42:06 [Record @ (undef) : > +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-26990) BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime failed
[ https://issues.apache.org/jira/browse/FLINK-26990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reopened FLINK-26990: - reopen since it is reproduced again > BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > failed > --- > > Key: FLINK-26990 > URL: https://issues.apache.org/jira/browse/FLINK-26990 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Matthias Pohl >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > [This > build|https://dev.azure.com/mapohl/flink/_build/results?buildId=914=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d=25899] > failed due to unexpected behavior in > {{BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime}}: > {code} > Apr 01 11:42:06 [ERROR] > org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > Time elapsed: 0.11 s <<< FAILURE! > Apr 01 11:42:06 java.lang.AssertionError: > Apr 01 11:42:06 > Apr 01 11:42:06 Expected size: 6 but was: 4 in: > Apr 01 11:42:06 [Record @ (undef) : > +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
[ https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724497#comment-17724497 ] Sergey Nuyanzin commented on FLINK-30629: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49175=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9704 > ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable > - > > Key: FLINK-30629 > URL: https://issues.apache.org/jira/browse/FLINK-30629 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.17.0, 1.18.0 >Reporter: Xintong Song >Assignee: Weijie Guo >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.17.0 > > Attachments: ClientHeartbeatTestLog.txt > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819 > {code:java} > Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 21.02 s <<< FAILURE! - in > org.apache.flink.client.ClientHeartbeatTest > Jan 11 04:32:39 [ERROR] > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat > Time elapsed: 9.157 s <<< ERROR! > Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet > running or has already been shut down. > Jan 11 04:32:39 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91) > Jan 11 04:32:39 at > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32138) SQLClientSchemaRegistryITCase fails with timeout on AZP
Sergey Nuyanzin created FLINK-32138: --- Summary: SQLClientSchemaRegistryITCase fails with timeout on AZP Key: FLINK-32138 URL: https://issues.apache.org/jira/browse/FLINK-32138 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.16.2 Reporter: Sergey Nuyanzin https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49174=logs=6e8542d7-de38-5a33-4aca-458d6c87066d=10d6732b-d79a-5c68-62a5-668516de5313=15753 {{SQLClientSchemaRegistryITCase}} fails on AZP as {noformat} May 20 03:41:34 [ERROR] org.apache.flink.tests.util.kafka.SQLClientSchemaRegistryITCase Time elapsed: 600.05 s <<< ERROR! May 20 03:41:34 org.junit.runners.model.TestTimedOutException: test timed out after 10 minutes May 20 03:41:34 at java.base@11.0.19/jdk.internal.misc.Unsafe.park(Native Method) May 20 03:41:34 at java.base@11.0.19/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) May 20 03:41:34 at java.base@11.0.19/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885) May 20 03:41:34 at java.base@11.0.19/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039) May 20 03:41:34 at java.base@11.0.19/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345) May 20 03:41:34 at java.base@11.0.19/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232) May 20 03:41:34 at app//com.github.dockerjava.api.async.ResultCallbackTemplate.awaitCompletion(ResultCallbackTemplate.java:91) May 20 03:41:34 at app//org.testcontainers.images.TimeLimitedLoggedPullImageResultCallback.awaitCompletion(TimeLimitedLoggedPullImageResultCallback.java:52) May 20 03:41:34 at app//org.testcontainers.images.RemoteDockerImage.resolve(RemoteDockerImage.java:89) May 20 03:41:34 at app//org.testcontainers.images.RemoteDockerImage.resolve(RemoteDockerImage.java:28) May 20 03:41:34 at app//org.testcontainers.utility.LazyFuture.getResolvedValue(LazyFuture.java:17) May 20 03:41:34 at app//org.testcontainers.utility.LazyFuture.get(LazyFuture.java:39) May 20 03:41:34 at app//org.testcontainers.containers.GenericContainer.getDockerImageName(GenericContainer.java:1330) May 20 03:41:34 at app//org.testcontainers.containers.GenericContainer.logger(GenericContainer.java:640) May 20 03:41:34 at app//org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:335) May 20 03:41:34 at app//org.testcontainers.containers.GenericContainer.start(GenericContainer.java:326) May 20 03:41:34 at app//org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1063) May 20 03:41:34 at app//org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) May 20 03:41:34 at app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) May 20 03:41:34 at app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) May 20 03:41:34 at java.base@11.0.19/java.util.concurrent.FutureTask.run(FutureTask.java:264) May 20 03:41:34 at java.base@11.0.19/java.lang.Thread.run(Thread.java:829) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25343) HBaseConnectorITCase.testTableSourceSinkWithDDL fail on azure
[ https://issues.apache.org/jira/browse/FLINK-25343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724494#comment-17724494 ] Sergey Nuyanzin commented on FLINK-25343: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49174=logs=a1ac4ce4-9a4f-5fdb-3290-7e163fba19dc=3a8f44aa-4415-5b14-37d5-5fecc568b139=16313 > HBaseConnectorITCase.testTableSourceSinkWithDDL fail on azure > - > > Key: FLINK-25343 > URL: https://issues.apache.org/jira/browse/FLINK-25343 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.14.3, 1.15.0, 1.16.0, 1.17.1 >Reporter: Yun Gao >Assignee: Ferenc Csaky >Priority: Critical > Labels: pull-request-available, stale-assigned, test-stability > > {code:java} > Dec 15 16:53:00 Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError > Dec 15 16:53:01 Running org.apache.flink.connector.hbase2.HBaseConnectorITCase > Dec 15 16:53:05 Formatting using clusterid: testClusterID > Dec 15 16:54:20 java.lang.ThreadGroup[name=PEWorkerGroup,maxpri=10] > Dec 15 16:54:20 Thread[HFileArchiver-8,5,PEWorkerGroup] > Dec 15 16:54:20 Thread[HFileArchiver-9,5,PEWorkerGroup] > Dec 15 16:54:20 Thread[HFileArchiver-10,5,PEWorkerGroup] > Dec 15 16:54:20 Thread[HFileArchiver-11,5,PEWorkerGroup] > Dec 15 16:54:20 Thread[HFileArchiver-12,5,PEWorkerGroup] > Dec 15 16:54:20 Thread[HFileArchiver-13,5,PEWorkerGroup] > Dec 15 16:54:20 Tests run: 9, Failures: 1, Errors: 0, Skipped: 0, Time > elapsed: 79.639 sec <<< FAILURE! - in > org.apache.flink.connector.hbase2.HBaseConnectorITCase > Dec 15 16:54:20 > testTableSourceSinkWithDDL(org.apache.flink.connector.hbase2.HBaseConnectorITCase) > Time elapsed: 6.843 sec <<< FAILURE! > Dec 15 16:54:20 java.lang.AssertionError: expected:<8> but was:<3> > Dec 15 16:54:20 at org.junit.Assert.fail(Assert.java:89) > Dec 15 16:54:20 at org.junit.Assert.failNotEquals(Assert.java:835) > Dec 15 16:54:20 at org.junit.Assert.assertEquals(Assert.java:120) > Dec 15 16:54:20 at org.junit.Assert.assertEquals(Assert.java:146) > Dec 15 16:54:20 at > org.apache.flink.connector.hbase2.HBaseConnectorITCase.testTableSourceSinkWithDDL(HBaseConnectorITCase.java:371) > Dec 15 16:54:20 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Dec 15 16:54:20 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Dec 15 16:54:20 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Dec 15 16:54:20 at java.lang.reflect.Method.invoke(Method.java:498) > Dec 15 16:54:20 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Dec 15 16:54:20 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Dec 15 16:54:20 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Dec 15 16:54:20 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Dec 15 16:54:20 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > Dec 15 16:54:20 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Dec 15 16:54:20 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Dec 15 16:54:20 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Dec 15 16:54:20 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Dec 15 16:54:20 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Dec 15 16:54:20 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Dec 15 16:54:20 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28204=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=12375 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
[ https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724493#comment-17724493 ] Sergey Nuyanzin commented on FLINK-30629: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49173=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9993 > ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable > - > > Key: FLINK-30629 > URL: https://issues.apache.org/jira/browse/FLINK-30629 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.17.0, 1.18.0 >Reporter: Xintong Song >Assignee: Weijie Guo >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.17.0 > > Attachments: ClientHeartbeatTestLog.txt > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819 > {code:java} > Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 21.02 s <<< FAILURE! - in > org.apache.flink.client.ClientHeartbeatTest > Jan 11 04:32:39 [ERROR] > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat > Time elapsed: 9.157 s <<< ERROR! > Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet > running or has already been shut down. > Jan 11 04:32:39 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91) > Jan 11 04:32:39 at > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)