hehuiyuan created FLINK-30679: --------------------------------- Summary: Can not load the data of hive dim table when project-push-down is introduced Key: FLINK-30679 URL: https://issues.apache.org/jira/browse/FLINK-30679 Project: Flink Issue Type: Bug Reporter: hehuiyuan
vectorize read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code} mapreduce read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_301] at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_301] at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_301] at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_301] at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code} The sql : {code:java} CREATE TABLE kafkaTableSource ( name string, age int, sex string, address string, ptime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'hehuiyuan1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.client.id' = 'test-consumer-group', 'properties.group.id' = 'test-consumer-group', 'format' = 'csv' ); CREATE TABLE printsink ( name string, age int, sex string, address string, score bigint, dt string ) WITH ( 'connector' = 'print' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'hhy', 'hive-version' = '2.0.0', 'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop' ); USE CATALOG myhive; USE hhy; set table.sql-dialect=hive; CREATE TABLE IF NOT EXISTS tmp_flink_test_text ( name STRING, age INT, score BIGINT ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES ( 'streaming-source.enable' = 'false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '5 min' ); set table.sql-dialect=default; USE CATALOG default_catalog; INSERT INTO default_catalog.default_database.printsink SELECT s.name, s.age, s.sex, s.address, r.score, r.dt FROM default_catalog.default_database.kafkaTableSource as s JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime AS r ON r.name = s.name; {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)