JunRuiLee commented on code in PR #21977:
URL: https://github.com/apache/flink/pull/21977#discussion_r111259
##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkTest.java:
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.connector.file.table.batch.BatchSink;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.CatalogTest;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HiveTableSink} enable auto-compaction. */
+@ExtendWith(TestLoggerExtension.class)
+class HiveTableCompactSinkTest {
+
+private static HiveCatalog catalog;
+
+private Configuration tableConf;
+
+@BeforeAll
+static void before() {
+catalog = HiveTestUtils.createHiveCatalog();
+catalog.open();
+}
+
+@AfterAll
+static void after() {
+catalog.close();
+}
+
+@BeforeEach
+void resetTableConf() {
+tableConf = new Configuration();
+tableConf.set(FactoryUtil.CONNECTOR, SqlCreateHiveTable.IDENTIFIER);
+tableConf.set(FileSystemConnectorOptions.AUTO_COMPACTION, true);
+}
+
+/** If only sink parallelism is set, compact operator should follow this
setting. */
+@Test
+void testOnlySetSinkParallelism() throws Exception {
+final int sinkParallelism = 4;
+
+tableConf.set(FileSystemConnectorOptions.SINK_PARALLELISM,
sinkParallelism);
+
+assertSinkAndCompactOperatorParallelism(
+tableConf, true, true, sinkParallelism, sinkParallelism);
+}
+
+@Test
+void testOnlySetCompactParallelism() throws Exception {
+final int compactParallelism = 4;
+
+tableConf.set(FileSystemConnectorOptions.COMPACTION_PARALLELISM,
compactParallelism);
+
+assertSinkAndCompactOperatorParallelism(tableConf, false, true, -1,
compactParallelism);
+}
+
+@Test
+void testSetBothSinkAndCompactParallelism() throws Exception {
+final int sinkParallelism = 8;
+final int compactParallelism = 4;
+
+tableConf.set(FileSystemConnectorOptions.SINK_PARALLELISM,
sinkParallelism);
+tableConf.set(FileSystemConnectorOptions.COMPACTION_PARALLELISM,
compactParallelism);
+
+