This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new c94a0799016 [FLINK-31124][Connectors/Hive] Add IT case for 
HiveTableSink speculative execution
c94a0799016 is described below

commit c94a0799016ae7f8d6e348930124936bc71a61aa
Author: shuiqiangchen <acqua....@gmail.com>
AuthorDate: Sat Feb 18 21:24:58 2023 +0800

    [FLINK-31124][Connectors/Hive] Add IT case for HiveTableSink speculative 
execution
    
    This closes #21962.
---
 .../hive/HiveTableSpeculativeSinkITCase.java       | 190 +++++++++++++++++++++
 1 file changed, 190 insertions(+)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSpeculativeSinkITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSpeculativeSinkITCase.java
new file mode 100644
index 00000000000..c4a1ca36cd2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSpeculativeSinkITCase.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SlowTaskDetectorOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests {@link HiveTableSink} with {@link
+ * org.apache.flink.connector.file.table.FileSystemOutputFormat} and 
speculative execution enabled.
+ */
+class HiveTableSpeculativeSinkITCase {
+
+    private static final int PARALLELISM = 3;
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(configure(new Configuration()))
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    private HiveCatalog hiveCatalog;
+
+    @BeforeEach
+    void createCatalog() {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog.open();
+    }
+
+    @AfterEach
+    void closeCatalog() {
+        if (hiveCatalog != null) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Test
+    void testBatchWritingWithoutCompactionWithSpeculativeSink()
+            throws ExecutionException, InterruptedException {
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configure(new 
Configuration()));
+        StreamTableEnvironment tEnv =
+                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        tEnv.executeSql("create database db1");
+        tEnv.useDatabase("db1");
+
+        tEnv.executeSql(
+                "create table append_table("
+                        + "i int, "
+                        + "j int) TBLPROPERTIES ("
+                        + "'sink.parallelism' ='"
+                        + PARALLELISM
+                        + "')"
+                        + "");
+        DataStream<Row> slowStream = 
tEnv.toChangelogStream(tEnv.sqlQuery("select 0, 0"));
+        slowStream =
+                slowStream
+                        .map(
+                                new RichMapFunction<Row, Row>() {
+                                    @Override
+                                    public Row map(Row value) throws Exception 
{
+                                        if 
(getRuntimeContext().getAttemptNumber() <= 0) {
+                                            Thread.sleep(Integer.MAX_VALUE);
+                                        }
+                                        assert 
getRuntimeContext().getAttemptNumber() > 0;
+                                        value.setField(1, 
getRuntimeContext().getAttemptNumber());
+                                        return value;
+                                    }
+                                })
+                        .name("slowMap")
+                        .returns(Types.ROW_NAMED(new String[] {"i", "j"}, 
Types.INT, Types.INT))
+                        .setParallelism(PARALLELISM);
+
+        Table t =
+                tEnv.fromChangelogStream(
+                        slowStream,
+                        Schema.newBuilder()
+                                .column("i", DataTypes.INT())
+                                .column("j", DataTypes.INT())
+                                .build(),
+                        ChangelogMode.insertOnly());
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.createTemporaryView("mappedTable", t);
+        String insertQuery = "insert into append_table select * from 
mappedTable";
+
+        // assert that the SlowMap operator is chained with the hive table 
sink, which will lead
+        // to a slow sink as well.
+        StreamTableEnvironmentImpl tEnvImpl = (StreamTableEnvironmentImpl) 
tEnv;
+        JobGraph jobGraph =
+                tEnvImpl.execEnv()
+                        .generateStreamGraph(
+                                tEnvImpl.getPlanner()
+                                        .translate(
+                                                Collections.singletonList(
+                                                        (ModifyOperation)
+                                                                
tEnvImpl.getParser()
+                                                                        
.parse(insertQuery)
+                                                                        
.get(0))))
+                        .getJobGraph();
+
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            if (jobVertex.getName().contains("slowMap")) {
+                assertThat(jobVertex.getName().contains("Sink")).isTrue();
+            }
+        }
+        tEnv.executeSql(insertQuery).await();
+
+        List<Row> rows =
+                CollectionUtil.iteratorToList(
+                        tEnv.executeSql("select * from 
append_table").collect());
+        rows.sort(Comparator.comparingInt(o -> (int) o.getField(0)));
+
+        // Finally we will get the output value with attemptNumber larger than 0
+        assertThat(rows).isEqualTo(Collections.singletonList(Row.of(0, 1)));
+        tEnv.executeSql("drop database db1 cascade");
+    }
+
+    private static Configuration configure(Configuration configuration) {
+        // for speculative execution
+        configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true);
+        // for testing, does not block node by default
+        configuration.set(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION, 
Duration.ZERO);
+        
configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, 
1.0);
+        
configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, 0.2);
+        configuration.set(
+                SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, 
Duration.ofMillis(0));
+        return configuration;
+    }
+}

Reply via email to