[jira] [Created] (FLINK-33871) Reduce getTable call for hive client and optimize graph generation time
hehuiyuan created FLINK-33871: - Summary: Reduce getTable call for hive client and optimize graph generation time Key: FLINK-33871 URL: https://issues.apache.org/jira/browse/FLINK-33871 Project: Flink Issue Type: Improvement Reporter: hehuiyuan HiveCatalog.getHiveTable method wastes a lot of time when generate graph, because the number of calls is relatively high. I have an sql task with over 2000 rows, the HiveCatalog.getHiveTable method is called 4879 times , but only six hive tables were used. ![image](https://github.com/apache/flink/assets/18002496/d5f0daf3-f80a-4790-ae21-4e75dff9cfd7) The client.getTable method costs a lot of time. ![image](https://github.com/apache/flink/assets/18002496/be0d176f-3915-4b92-a177-f1cfaf6d2927) There is a statistic that jobmanager interacts with hive when generate graph. If One call takes approximately 50 milliseconds , How much time it spends : 4879 * 50 =243950ms = 243.95s = 4min We can cache and client.getTable method is only called six times. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31086) [DOC]update connector lable for blackhole and kafka
hehuiyuan created FLINK-31086: - Summary: [DOC]update connector lable for blackhole and kafka Key: FLINK-31086 URL: https://issues.apache.org/jira/browse/FLINK-31086 Project: Flink Issue Type: Bug Components: Documentation Reporter: hehuiyuan pdate connector label for kafka and blackhole. Blackhole: sink:bounded unbounded Kafka: source bounded !https://user-images.githubusercontent.com/18002496/216600374-5c9d16db-66ac-42a4-8b21-16245a71f9ef.png|width=468,height=168! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30679) Can not load the data of hive dim table when project-push-down is introduced
hehuiyuan created FLINK-30679: - Summary: Can not load the data of hive dim table when project-push-down is introduced Key: FLINK-30679 URL: https://issues.apache.org/jira/browse/FLINK-30679 Project: Flink Issue Type: Bug Reporter: hehuiyuan vectorize read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code} mapreduce read: {code:java} java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_301] at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_301] at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_301] at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_301] at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_301] at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:141) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code} The sql : {code:java} CREATE TABLE kafkaTableSource ( name string, age int, sex string, address string, ptime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'hehuiyuan1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.client.id' = 'test-consumer-group', 'properties.group.id' = 'test-consumer-group', 'format' = 'csv' ); CREATE TABLE printsink ( name string, age int, sex string, address string, score bigint, dt string ) WITH ( 'connector' = 'print' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'hhy', 'hive-version' = '2.0.0', 'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop' ); USE CATALOG myhive; USE hhy; set table.sql-dialect=hive; CREATE TABLE IF NOT EXISTS tmp_flink_test_text ( name STRING, age INT, score BIGINT ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES ( 'streaming-source.enable' = 'false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '5 min' ); set table.sql-dialect=default; USE CATALOG default_catalog; INSERT INTO default_catalog.default_database.printsink SELECT s.name, s.age, s.sex, s.address, r.score, r.dt FROM default_catalog.default_database.kafkaTableSource as s JOIN
[jira] [Created] (FLINK-29553) Support UNIX_TIMESTAMP in Table API
hehuiyuan created FLINK-29553: - Summary: Support UNIX_TIMESTAMP in Table API Key: FLINK-29553 URL: https://issues.apache.org/jira/browse/FLINK-29553 Project: Flink Issue Type: Improvement Reporter: hehuiyuan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29052) Support TO_TIMESTAMP built-in function in Table API
hehuiyuan created FLINK-29052: - Summary: Support TO_TIMESTAMP built-in function in Table API Key: FLINK-29052 URL: https://issues.apache.org/jira/browse/FLINK-29052 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: hehuiyuan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28750) Whether to add comment for hive table
hehuiyuan created FLINK-28750: - Summary: Whether to add comment for hive table Key: FLINK-28750 URL: https://issues.apache.org/jira/browse/FLINK-28750 Project: Flink Issue Type: Improvement Reporter: hehuiyuan Currently, I have a hive ddl,as follows {code:java} "set table.sql-dialect=hive;\n" + "CREATE TABLE IF NOT EXISTS myhive.dev.shipu3_test_1125 (\n" + " `id` int COMMENT 'ia',\n" + " `cartdid` bigint COMMENT 'aaa',\n" + " `customer` string COMMENT '',\n" + " `product` string COMMENT '',\n" + " `price` double COMMENT '',\n" + " `dt` STRING COMMENT ''\n" + ") PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES (\n" + " 'streaming-source.enable' = 'false',\n" + " 'streaming-source.partition.include' = 'all',\n" + " 'lookup.join.cache.ttl' = '12 h'\n" + ")"; {code} It is parsed as SqlCreateHiveTable by hive dialect parser. But the field commet is lost. !image-2022-07-30-15-21-58-062.png|width=568,height=283! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28716) uploading multiple files/form datas fail randomly when use rest api
hehuiyuan created FLINK-28716: - Summary: uploading multiple files/form datas fail randomly when use rest api Key: FLINK-28716 URL: https://issues.apache.org/jira/browse/FLINK-28716 Project: Flink Issue Type: Bug Reporter: hehuiyuan It can happen error randomly when use `jars/upload` rest api. {code:java} java.lang.IndexOutOfBoundsException: index: 1804, length: 1 (expected: range(0, 1804)) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkRangeBounds(AbstractByteBuf.java:1390) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkIndex0(AbstractByteBuf.java:1397) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1384) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1379) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.getByte(AbstractByteBuf.java:355) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostBodyUtil.findDelimiter(HttpPostBodyUtil.java:238) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.loadDataMultipartOptimized(HttpPostMultipartRequestDecoder.java:1172) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.getFileUpload(HttpPostMultipartRequestDecoder.java:926) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.decodeMultipart(HttpPostMultipartRequestDecoder.java:572) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.findMultipartDisposition(HttpPostMultipartRequestDecoder.java:797) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.decodeMultipart(HttpPostMultipartRequestDecoder.java:511) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.findMultipartDelimiter(HttpPostMultipartRequestDecoder.java:663) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.decodeMultipart(HttpPostMultipartRequestDecoder.java:498) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.parseBodyMultipart(HttpPostMultipartRequestDecoder.java:463) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.parseBody(HttpPostMultipartRequestDecoder.java:432) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:347) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:54) at org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.offer(HttpPostRequestDecoder.java:223) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:176) at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:71) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28375) Whether to consider adding other data type to support for last_value function
hehuiyuan created FLINK-28375: - Summary: Whether to consider adding other data type to support for last_value function Key: FLINK-28375 URL: https://issues.apache.org/jira/browse/FLINK-28375 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: hehuiyuan Attachments: image-2022-07-04-16-20-08-661.png, image-2022-07-04-16-21-28-198.png {code:java} CREATE TABLE jmqTableSource ( keyField INTEGER, timestampField INTEGER, arrayField ARRAY, proc as PROCTIME()) WITH ( 'connector' = 'kafka', ); insert into kafkaTableSink select keyField, last_value(arrayField) over (partition by keyField order by proc) from kafkaTableSource; {code} Exception in thread "main" org.apache.flink.table.api.TableException: LAST_VALUE aggregate function does not support type: ''ARRAY''. Please re-check the data type. I have a modification to support this, but why does the community not support it? Is there any special reason that i do not considered? The test the array data type can run: mock data: !image-2022-07-04-16-21-28-198.png! result: !image-2022-07-04-16-20-08-661.png|width=626,height=146! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27238) The HiveGenericUDTF should support primitive array,for example Array Array ...
hehuiyuan created FLINK-27238: - Summary: The HiveGenericUDTF should support primitive array,for example Array Array ... Key: FLINK-27238 URL: https://issues.apache.org/jira/browse/FLINK-27238 Project: Flink Issue Type: Bug Reporter: hehuiyuan Attachments: image-2022-04-14-10-27-50-340.png !image-2022-04-14-10-27-50-340.png|width=381,height=260! If argTypes[0] is Array ,it will throw exception. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27194) Whether to consider adding the configuration of ignore when data deserialization failed.
hehuiyuan created FLINK-27194: - Summary: Whether to consider adding the configuration of ignore when data deserialization failed. Key: FLINK-27194 URL: https://issues.apache.org/jira/browse/FLINK-27194 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: hehuiyuan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26848) JDBC don't flush data when disable flush-max-rows and flush-interval
hehuiyuan created FLINK-26848: - Summary: JDBC don't flush data when disable flush-max-rows and flush-interval Key: FLINK-26848 URL: https://issues.apache.org/jira/browse/FLINK-26848 Project: Flink Issue Type: Bug Reporter: hehuiyuan -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26565) Use lateTrigger when the window maxtimestap of data is less than currentwatermark and it is not discarded because the allow latency parameter
hehuiyuan created FLINK-26565: - Summary: Use lateTrigger when the window maxtimestap of data is less than currentwatermark and it is not discarded because the allow latency parameter Key: FLINK-26565 URL: https://issues.apache.org/jira/browse/FLINK-26565 Project: Flink Issue Type: Improvement Reporter: hehuiyuan Attachments: image-2022-03-10-11-27-52-891.png Use lateTrigger when the window maxtimestap of data is less than currentwatermark and it is not discarded because the allow latency parameter. !image-2022-03-10-11-27-52-891.png|width=543,height=318! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26498) The window result may not have been emitted when use window emit feature and use lateTrigger.
hehuiyuan created FLINK-26498: - Summary: The window result may not have been emitted when use window emit feature and use lateTrigger. Key: FLINK-26498 URL: https://issues.apache.org/jira/browse/FLINK-26498 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: hehuiyuan Attachments: image-2022-03-05-23-53-37-086.png, image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png the sql of job : {code:java} CREATE TABLE tableSource( name string, age int not null, sex string, dt TIMESTAMP(3), WATERMARK FOR dt AS dt - INTERVAL '0' SECOND ) WITH ( ); CREATE TABLE tableSink( windowstart timestamp(3), windowend timestamp(3), name string, age int, cou bigint ) WITH ( ); INSERT INTO tablesink SELECT TUMBLE_START(dt, INTERVAL '1' HOUR), TUMBLE_END(dt, INTERVAL '1' HOUR), name, age, count(sex) FROM tableSource GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code} and table config: {code:java} table.exec.emit.allow-lateness = 1 hour table.exec.emit.late-fire.delay = 1 min table.exec.emit.early-fire.delay = 1min{code} The data: {code:java} >hehuiyuan1,22,woman,2022-03-05 00:30:22.000 >hehuiyuan1,22,woman,2022-03-05 00:40:22.000 //pause ,wait for the window trigger for earlyTrigger 1 min >hehuiyuan1,22,woman,2022-03-05 00:50:22.000 >hehuiyuan1,22,woman,2022-03-05 00:56:22.000 //pause ,wait for the window trigger for earlyTrigger 1 min >hehuiyuan1,22,woman,2022-03-05 01:00:00.000 //pause ,wait for the window trigger for earlyTrigger 1 min >hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data //pause ,wait for the window trigger for earlyTrigger 1 min >hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data >>hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for >[0:00:00 1:00:00] >hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code} The result: {code:java} > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1]) > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code} `hehuiyuan1,22,woman,2022-03-05 00:59:20.100` is lost, the lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is cleaned when the data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived that updated watermark. The window[0:00:00 ,1:00:00] has 6 pieces of data, but we got 5. The trigger is AfterEndOfWindowEarlyAndLate . So WindowOpearator may need to emit reuslt when the window cleanupTimer call onEventTime. I think the correct result is as follows: {code:java} > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1]) > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6]) > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26320) Update hive doc for 1 m->1 min
hehuiyuan created FLINK-26320: - Summary: Update hive doc for 1 m->1 min Key: FLINK-26320 URL: https://issues.apache.org/jira/browse/FLINK-26320 Project: Flink Issue Type: Improvement Components: Documentation Reporter: hehuiyuan Attachments: image-2022-02-23-15-36-54-649.png {{Time interval unit label 'm' does not match any of the recognized units: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds)}} ‘1 m’ is misleading when we used, which is not correct. !image-2022-02-23-15-36-54-649.png|width=491,height=219! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25262) Support to send data to lookup table for KeyGroupStreamPartitioner way for SQL
hehuiyuan created FLINK-25262: - Summary: Support to send data to lookup table for KeyGroupStreamPartitioner way for SQL Key: FLINK-25262 URL: https://issues.apache.org/jira/browse/FLINK-25262 Project: Flink Issue Type: Improvement Reporter: hehuiyuan Attachments: image-2021-12-12-15-15-48-540.png, image-2021-12-12-15-18-08-574.png Send data to lookup table by hash , which can improve cache hit rate, futher improve processing performance and reduce the size of cache. Shoulder we consider to introducing it? !image-2021-12-12-15-18-08-574.png|width=419,height=193! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24620) Add partition discovery for kafka sink when `sink.partitioner`= 'fix'
hehuiyuan created FLINK-24620: - Summary: Add partition discovery for kafka sink when `sink.partitioner`= 'fix' Key: FLINK-24620 URL: https://issues.apache.org/jira/browse/FLINK-24620 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: hehuiyuan Add partition discovery for kafka sink when `sink.partitioner`= 'fix' -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23810) Print sql when parse failed , which is convenient to find error
hehuiyuan created FLINK-23810: - Summary: Print sql when parse failed , which is convenient to find error Key: FLINK-23810 URL: https://issues.apache.org/jira/browse/FLINK-23810 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: hehuiyuan Print sql when parse failed , which is convenient to find error sql. {code:java} public SqlNode parse(String sql) { try { SqlParser parser = SqlParser.create(sql, config); return parser.parseStmt(); } catch (SqlParseException e) { throw new SqlParserException("SQL parse failed. " + e.getMessage(), e); } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23604) 'csv.disable-quote-character' can not take effect during deserialization for old csv format
hehuiyuan created FLINK-23604: - Summary: 'csv.disable-quote-character' can not take effect during deserialization for old csv format Key: FLINK-23604 URL: https://issues.apache.org/jira/browse/FLINK-23604 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: hehuiyuan https://issues.apache.org/jira/browse/FLINK-21207 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23237) Add log to print data that failed to deserialize when ignore-parse-error=true and threre is NPE question when use `new String(message) ` if message = null
hehuiyuan created FLINK-23237: - Summary: Add log to print data that failed to deserialize when ignore-parse-error=true and threre is NPE question when use `new String(message) ` if message = null Key: FLINK-23237 URL: https://issues.apache.org/jira/browse/FLINK-23237 Project: Flink Issue Type: Improvement Reporter: hehuiyuan (1)Add log to print error data that failed to deserialize when set `ignore-parse-error` = `true` (2)Threre is NPE question when use `new String(message) ` if message is null. {code:java} public RowData deserialize(@Nullable byte[] message) throws IOException { if (message == null) { return null; } try { final JsonNode root = objectReader.readValue(message); return (RowData) runtimeConverter.convert(root); } catch (Throwable t) { if (ignoreParseErrors) { return null; } throw new IOException( String.format("Failed to deserialize CSV row '%s'.", new String(message)), t); } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22976) Whether to consider adding config-option to control whether to exclude record.key value from record.value value
hehuiyuan created FLINK-22976: - Summary: Whether to consider adding config-option to control whether to exclude record.key value from record.value value Key: FLINK-22976 URL: https://issues.apache.org/jira/browse/FLINK-22976 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: hehuiyuan upsert-kafka: key : {"name":"hehui111","sname":"wman"} value : {"name":"hehui111","sname":"wman","sno":"wman","sclass":"wman","address":"wman"} The value of ProduceRecord' value contain the value of ProduceRecord' key. Whether to consider adding config-option to control whether to exclude record.key value from record.value value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22954) Don't support consuming update and delete changes when use table function that does not contain table field
hehuiyuan created FLINK-22954: - Summary: Don't support consuming update and delete changes when use table function that does not contain table field Key: FLINK-22954 URL: https://issues.apache.org/jira/browse/FLINK-22954 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: hehuiyuan {code:java} Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafkaTableSink' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafkaTableSink' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:125) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at
[jira] [Created] (FLINK-22617) Add log when create bulk format
hehuiyuan created FLINK-22617: - Summary: Add log when create bulk format Key: FLINK-22617 URL: https://issues.apache.org/jira/browse/FLINK-22617 Project: Flink Issue Type: Bug Components: Connectors / Hive Reporter: hehuiyuan Hive table sink has some log that tells us whether to use native or mapred . {code:java} LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); LOG.info("Hive streaming sink: Use native parquet writer."); LOG.info( "Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available."); {code} I have some ideas we can add log to make it more obvious when read hive for `createBulkFormatForSplit`. !image-2021-05-10-17-04-15-571.png|width=490,height=198! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22400) NPE problem when call HiveInspectors.toFlinkObject for hive-exec 2.0.0
hehuiyuan created FLINK-22400: - Summary: NPE problem when call HiveInspectors.toFlinkObject for hive-exec 2.0.0 Key: FLINK-22400 URL: https://issues.apache.org/jira/browse/FLINK-22400 Project: Flink Issue Type: Bug Components: Connectors / Hive Reporter: hehuiyuan Attachments: image-2021-04-21-23-18-56-107.png ENV: flink 1.12 hive 2.0.0 ERROR LOG: {code:java} Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:399) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:620) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:584) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.flink.connectors.hive.FlinkHiveException: java.lang.NullPointerException at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.nextRecord(HiveMapredSplitReader.java:190) at org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter$HiveReader.nextRecord(HiveBulkFormatAdapter.java:336) at org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter$HiveReader.readBatch(HiveBulkFormatAdapter.java:319) at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138) ... 6 more Caused by: java.lang.NullPointerException at org.apache.flink.table.functions.hive.conversion.HiveInspectors.toFlinkObject(HiveInspectors.java:335) at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.nextRecord(HiveMapredSplitReader.java:180) ... 11 more {code} {code:java} Map map = mapInspector.getMap(data); hive-exec 2.0.0 {code} !image-2021-04-21-23-18-56-107.png|width=372,height=191! Null may be returned here,then throw NPE when call `map.size` (map == null) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22263) Using TIMESTAMPADD function with partition value has some problem when push partition into TableSource
hehuiyuan created FLINK-22263: - Summary: Using TIMESTAMPADD function with partition value has some problem when push partition into TableSource Key: FLINK-22263 URL: https://issues.apache.org/jira/browse/FLINK-22263 Project: Flink Issue Type: Bug Reporter: hehuiyuan SQL (table api): {code:java} CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'hhy' ); INSERT INTO default_catalog.default_database.table_sink select * from myhive.hhy.tmp_flink_test where dt=CAST(TIMESTAMPADD(DAY, -1, CURRENT_DATE) as varchar); {code} Error log: {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: Data type 'INTERVAL SECOND(3) NOT NULL' with conversion class 'java.time.Duration' does not support a value literal of class 'java.math.BigDecimal'.Exception in thread "main" org.apache.flink.table.api.ValidationException: Data type 'INTERVAL SECOND(3) NOT NULL' with conversion class 'java.time.Duration' does not support a value literal of class 'java.math.BigDecimal'. at org.apache.flink.table.expressions.ValueLiteralExpression.validateValueDataType(ValueLiteralExpression.java:286) at org.apache.flink.table.expressions.ValueLiteralExpression.(ValueLiteralExpression.java:79) at org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral(ApiExpressionUtils.java:251) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:432) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:340) at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1173) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439) at
[jira] [Created] (FLINK-22174) `csv.quote-character` does not work for csv format when sink
hehuiyuan created FLINK-22174: - Summary: `csv.quote-character` does not work for csv format when sink Key: FLINK-22174 URL: https://issues.apache.org/jira/browse/FLINK-22174 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: hehuiyuan {code:java} CREATE TABLE kafkaTableSource ( name string, age int, sex string, address string ) WITH ( 'connector' = 'kafka', 'topic' = 'hehuiyuan1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.client.id' = 'test-consumer-group', 'properties.group.id' = 'test-consumer-group', 'format' = 'csv', 'csv.quote-character' = '*' ); CREATE TABLE kafkaTableSink ( name string, age int, sex string, address string ) WITH ( 'connector' = 'kafka', 'topic' = 'hehuiyuan2', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'csv', 'csv.quote-character' = '#' ); insert into kafkaTableSink select * from kafkaTableSource; {code} Test 1 : Generate data for topic `hehuiyuan1` {code:java} >hehuiyuan,22,man,hengshui >*hehuiyuan*,*22*,*man*,*hengshui* >#hehuiyuan#,22,#man#,#hengshui# >hehuiyuan,22,#man#,#hengshui# >"hehuiyuan",22,#man#,#hengshui# {code} Result data for topic `hehuiyuan2` {code:java} >hehuiyuan,22,man,hengshui >hehuiyuan,22,man,hengshui >###hehuiyuan###,22,###man###,###hengshui### >hehuiyuan,22,###man###,###hengshui### >#"hehuiyuan"#,22,###man###,###hengshui### {code} I think the result shoule be ` #hehuiyuan#,22,#man#,#hengshui# ` for the first and second data. Test 2: Add Timestamp field: {code:java} CREATE TABLE kafkaTableSource ( name string, age int, sex string, address string, dt timestamp(3) ) WITH ( {code} Generate data for topic `hehuiyuan1` {code:java} >hehuiyuan,22,man,hengshui,2020-12-12 12:12:12 >hehuiyuan,22,man,hengshui,2020-12-16 10:00:00{code} Result data for topic `hehuiyuan2` {code:java} >hehuiyuan,22,man,hengshui,#2020-12-12 12:12:12# >hehuiyuan,22,man,hengshui,#2020-12-16 10:00:00# {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21930) There are some error for hive_read_writer.md
hehuiyuan created FLINK-21930: - Summary: There are some error for hive_read_writer.md Key: FLINK-21930 URL: https://issues.apache.org/jira/browse/FLINK-21930 Project: Flink Issue Type: Wish Components: Documentation Reporter: hehuiyuan !https://user-images.githubusercontent.com/18002496/111795099-db2c0580-8901-11eb-929d-dd40c179a948.png|width=406,height=194! h1. fix hive dim doc replace order to o -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21829) create HiveCatalog with custom hadoopconfdir first
hehuiyuan created FLINK-21829: - Summary: create HiveCatalog with custom hadoopconfdir first Key: FLINK-21829 URL: https://issues.apache.org/jira/browse/FLINK-21829 Project: Flink Issue Type: Wish Components: Connectors / Hive Reporter: hehuiyuan here is no prompt when the the path to hadoop conf configured is wrong unintentional. {code:java} private static HiveConf createHiveConf( @Nullable String hiveConfDir, @Nullable String hadoopConfDir) { // create HiveConf from hadoop configuration with hadoop conf directory configured. Configuration hadoopConf = null; if (isNullOrWhitespaceOnly(hadoopConfDir)) { for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths( new org.apache.flink.configuration.Configuration())) { hadoopConf = getHadoopConfiguration(possibleHadoopConfPath); if (hadoopConf != null) { break; } } } else { hadoopConf = getHadoopConfiguration(hadoopConfDir); } if (hadoopConf == null) { hadoopConf = new Configuration(); } HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class); {code} It is better to load hadoop conf from possiable hadoop path when the path is wrong. (1) try to load from the custom hadoop conf path (2) try to load from possiable hadoop conf path if {{Configuration hadoopConf}} is null. (3) new Configuration if {{Configuration hadoopConf}} is null {code:java} private static HiveConf createHiveConf( @Nullable String hiveConfDir, @Nullable String hadoopConfDir) { // create HiveConf from hadoop configuration with hadoop conf directory configured. Configuration hadoopConf = null; if (!isNullOrWhitespaceOnly(hadoopConfDir)) { hadoopConf = getHadoopConfiguration(hadoopConfDir); } if (hadoopConf == null) { for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths( new org.apache.flink.configuration.Configuration())) { hadoopConf = getHadoopConfiguration(possibleHadoopConfPath); if (hadoopConf != null) { break; } } } if (hadoopConf == null) { hadoopConf = new Configuration(); } HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21284) Non-deterministic UDF functions return different values
hehuiyuan created FLINK-21284: - Summary: Non-deterministic UDF functions return different values Key: FLINK-21284 URL: https://issues.apache.org/jira/browse/FLINK-21284 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: hehuiyuan Non-deterministic UDF functions is used mutiple times , the result is different. {code:java} tableEnv.registerFunction("sample", new SampleFunction()); Table tm = tableEnv.sqlQuery("select name, RAND_INTEGER(10) as sample , sex from myhive_staff"); tableEnv.registerTable("tmp", tm); tableEnv.sqlUpdate("insert into sinktable select * from tmp where sample >= 8"); // UDF函数 public class SampleFunction extends ScalarFunction { public int eval(int pvid) { int a = (int) (Math.random() * 10); System.out.println("" + a ); return a; } }{code} Sample udf function is used for `RAND_INTEGER(10) as sample` when sink, which lead to inconsistent result. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20771) Hive partition is not added when there is a lot of data
hehuiyuan created FLINK-20771: - Summary: Hive partition is not added when there is a lot of data Key: FLINK-20771 URL: https://issues.apache.org/jira/browse/FLINK-20771 Project: Flink Issue Type: Bug Components: Connectors / Hive Reporter: hehuiyuan Attachments: image-2020-12-25-18-09-42-707.png, image-2020-12-25-18-15-07-519.png Hive partition is not added when the data is huge . !image-2020-12-25-18-09-42-707.png|width=437,height=115! Before partition commit, *inProgressPart* will be reinitialize . But bucket is active , the partition is !image-2020-12-25-18-15-07-519.png|width=574,height=192! bucket is active , so the notifyBucketInactive is not executed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20318) Fix cast question for properies() method in kafka ConnectorDescriptor
hehuiyuan created FLINK-20318: - Summary: Fix cast question for properies() method in kafka ConnectorDescriptor Key: FLINK-20318 URL: https://issues.apache.org/jira/browse/FLINK-20318 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: hehuiyuan This Jira fixes Kafka connector. There is a cast problem when use properties method. {code:java} Properties props = new Properties(); props.put( "enable.auto.commit", "false"); props.put( "fetch.max.wait.ms", "3000"); props.put("flink.poll-timeout", 5000); props.put( "flink.partition-discovery.interval-millis", false); kafka = new Kafka() .version("0.11") .topic(topic) .properties(props); {code} {code:java} Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String Exception in thread "main" java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.String {code} change : - *change (String) v > String.valueOf() in Kafka.java* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20301) Flink sql 1.10 : Legacy Decimal and decimal for Array that is not Compatible
hehuiyuan created FLINK-20301: - Summary: Flink sql 1.10 : Legacy Decimal and decimal for Array that is not Compatible Key: FLINK-20301 URL: https://issues.apache.org/jira/browse/FLINK-20301 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: hehuiyuan Attachments: image-2020-11-23-23-48-02-102.png The error log: {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY of table field 'numbers' does not match with the physical type ARRAY of the 'numbers' field of the TableSource return type.Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY of table field 'numbers' does not match with the physical type ARRAY of the 'numbers' field of the TableSource return type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:160) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:185) at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:246) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:228) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:206) at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:110) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:118) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) {code} Background : Flink SQL --- blink ---1.10 The shema for TableSource is JSON: {code:java} "{type:'object',properties:{age:{type:'number'},numbers: { type: 'array', items: { type: 'number' } },name:{type:'string'},dt:{type: 'string', format: 'date-time'},timehour:{type:
[jira] [Created] (FLINK-19686) Cast question for data | time
hehuiyuan created FLINK-19686: - Summary: Cast question for data | time Key: FLINK-19686 URL: https://issues.apache.org/jira/browse/FLINK-19686 Project: Flink Issue Type: Wish Components: Table SQL / API Reporter: hehuiyuan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18613) How to support retract & upsert sink for a TableSink ?
hehuiyuan created FLINK-18613: - Summary: How to support retract & upsert sink for a TableSink ? Key: FLINK-18613 URL: https://issues.apache.org/jira/browse/FLINK-18613 Project: Flink Issue Type: Wish Components: Table SQL / Planner Reporter: hehuiyuan Environment : FLink 1.9 / Blink planner Hi , i want to ask a question : I have a job that executes multiple sql and a TableSink class: (1) insert into table_0 select count(*) from table1; (2)insert into table_2 select name, sum(score) form table1 group by name; The TableSink implements UpsertStreamTablesink interface. That is ok for SQL (2), but is not suppported for SQL (1) which there are not keys. But i want to use a TableSink which can support the upsert and retract , can you give me some advices? ~ Thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18339) ValidationException exception that field typeinformation in TableSchema and in TableSource return type for blink
hehuiyuan created FLINK-18339: - Summary: ValidationException exception that field typeinformation in TableSchema and in TableSource return type for blink Key: FLINK-18339 URL: https://issues.apache.org/jira/browse/FLINK-18339 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.9.0 Reporter: hehuiyuan Attachments: image-2020-06-17-10-37-48-166.png, image-2020-06-17-10-53-08-424.png The type of `datatime` field is OBJECT_ARRAY. Exception: {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(BasicArrayTypeInfo) of table field 'datatime' does not match with type BasicArrayTypeInfo of the field 'datatime' of the TableSource return type.Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(BasicArrayTypeInfo) of table field 'datatime' does not match with type BasicArrayTypeInfo of the field 'datatime' of the TableSource return type. at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:141) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:119) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at
[jira] [Created] (FLINK-18145) Segment optimization does not work ?
hehuiyuan created FLINK-18145: - Summary: Segment optimization does not work ? Key: FLINK-18145 URL: https://issues.apache.org/jira/browse/FLINK-18145 Project: Flink Issue Type: Wish Reporter: hehuiyuan DAG Segement Optimization: !image-2020-06-05-14-40-03-123.png|width=569,height=226! Code: {code:java} StreamExecutionEnvironment env = EnvUtil.getEnv(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings); GeneratorTableSource tableSource = new GeneratorTableSource(2, 1, 70, 0); tableEnv.registerTableSource("myTble",tableSource); Table mytable = tableEnv.scan("myTble"); mytable.printSchema(); tableEnv.toAppendStream(mytable,Row.class).addSink(new PrintSinkFunction<>()).setParallelism(2); Table tableproc = tableEnv.sqlQuery("SELECT key, count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '30' SECOND) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '30' SECOND) ,key"); tableproc.printSchema(); tableEnv.registerTable("t4",tableproc); Table table = tableEnv.sqlQuery("SELECT key,count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '24' HOUR) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '24' HOUR) ,key"); table.printSchema(); tableEnv.registerTable("t3",table); String[] fields = new String[]{"key","countkey","tumblestart"}; TypeInformation[] fieldsType = new TypeInformation[3]; fieldsType[0] = Types.INT; fieldsType[1] = Types.LONG; fieldsType[2] = Types.SQL_TIMESTAMP; PrintTableUpsertSink printTableSink = new PrintTableUpsertSink(fields,fieldsType,true); tableEnv.registerTableSink("inserttable",printTableSink); tableEnv.sqlUpdate("insert into inserttable select key,countkey,tumblestart from t3"); String[] fieldsproc = new String[]{"key","countkey","tumblestart"}; TypeInformation[] fieldsTypeproc = new TypeInformation[3]; fieldsTypeproc[0] = Types.INT; fieldsTypeproc[1] = Types.LONG; fieldsTypeproc[2] = Types.SQL_TIMESTAMP; PrintTableUpsertSink printTableSinkproc = new PrintTableUpsertSink(fieldsproc,fieldsTypeproc,true); tableEnv.registerTableSink("inserttableproc",printTableSinkproc); tableEnv.sqlUpdate("insert into inserttableproc select key,countkey,tumblestart from t4"); {code} I have a custom table source , then (1) transform datastream to use `toAppendStream` method , then sink (2) use tumble ,then sink (3) use another tumbel ,then sink but the segement optimization did't work. *!image-2020-06-05-14-50-33-759.png|width=458,height=336!* *The source is executed by 3 threads and generate duplicate data for 3 times* !image-2020-06-05-14-53-57-056.png|width=1216,height=204! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17117) There are an useless cast operation for sql on blink when generate code
hehuiyuan created FLINK-17117: - Summary: There are an useless cast operation for sql on blink when generate code Key: FLINK-17117 URL: https://issues.apache.org/jira/browse/FLINK-17117 Project: Flink Issue Type: Wish Components: Table SQL / Planner Reporter: hehuiyuan Attachments: image-2020-04-13-19-44-19-174.png !image-2020-04-13-19-44-19-174.png|width=641,height=305! This mehthod `generateOneInputStreamOperator` when OperatorCodeGenerator generates SourceConversion: {code:java} @Override public void processElement($STREAM_RECORD $ELEMENT) throws Exception { $inputTypeTerm $inputTerm = ($inputTypeTerm) ${converter(s"$ELEMENT.getValue()")}; ${ctx.reusePerRecordCode()} ${ctx.reuseLocalVariableCode()} ${if (lazyInputUnboxingCode) "" else ctx.reuseInputUnboxingCode()} $processCode } {code} {code:java} $inputTypeTerm $inputTerm = ($inputTypeTerm) ${converter(s"$ELEMENT.getValue()")}; {code} ScanUtil calls generateOneInputStreamOperator {code:java} val generatedOperator = OperatorCodeGenerator.generateOneInputStreamOperator[Any, BaseRow]( ctx, convertName, processCode, outputRowType, converter = inputTermConverter) //inputTermConverter val (inputTermConverter, inputRowType) = { val convertFunc = CodeGenUtils.genToInternal(ctx, inputType) internalInType match { case rt: RowType => (convertFunc, rt) case _ => ((record: String) => s"$GENERIC_ROW.of(${convertFunc(record)})", RowType.of(internalInType)) } } {code} CodeGenUtils.scala : genToInternal {code:java} def genToInternal(ctx: CodeGeneratorContext, t: DataType): String => String = { val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(t)) if (isConverterIdentity(t)) { term => s"($iTerm) $term" } else { val eTerm = boxedTypeTermForExternalType(t) val converter = ctx.addReusableObject( DataFormatConverters.getConverterForDataType(t), "converter") term => s"($iTerm) $converter.toInternal(($eTerm) $term)" } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16704) Document has some error for connect.md
hehuiyuan created FLINK-16704: - Summary: Document has some error for connect.md Key: FLINK-16704 URL: https://issues.apache.org/jira/browse/FLINK-16704 Project: Flink Issue Type: Wish Components: Documentation Affects Versions: 1.10.0 Reporter: hehuiyuan Attachments: image-2020-03-21-15-09-59-802.png, image-2020-03-21-15-11-00-061.png For branch < = 1.10 , that has some errors. *Type.ROW ->Types.ROW* !image-2020-03-21-15-09-59-802.png|width=265,height=296! !image-2020-03-21-15-11-00-061.png|width=334,height=193! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16584) Whether to support the long type field in table planner when the source is kafka and event time field's type is long
hehuiyuan created FLINK-16584: - Summary: Whether to support the long type field in table planner when the source is kafka and event time field's type is long Key: FLINK-16584 URL: https://issues.apache.org/jira/browse/FLINK-16584 Project: Flink Issue Type: Wish Components: Table SQL / API Reporter: hehuiyuan For rowtime function , the field type may be long or timestamp . But the event time field type is only timestamp when use kafka connect. Some validations (for example ,when create kafka table source ) are not allowed long. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16465) Add more detailed instructions for TableSink
hehuiyuan created FLINK-16465: - Summary: Add more detailed instructions for TableSink Key: FLINK-16465 URL: https://issues.apache.org/jira/browse/FLINK-16465 Project: Flink Issue Type: Wish Components: Documentation Reporter: hehuiyuan That has a error when I define a UpserStreamTableSink without getTableSchem method or getFieldsNames/getFields methods. Some instructions are added that may be better. [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#upsertstreamtablesink] {code:java} Exception in thread "main" org.apache.flink.table.api.TableException: Table sink does not implement a table schema. at org.apache.flink.table.sinks.TableSink.getTableSchema(TableSink.java:75) at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSink(TableEnvironmentImpl.java:215) at com.jd.flink.sql.writer.console.ConsoleUpsertWriter.initWriter(ConsoleUpsertWriter.java:40) at com.jd.flink.sql.writer.Writer.registWriter(Writer.java:33) at com.jd.flink.sql.test.MainClassTest_hhy.main(MainClassTest_hhy.java:222) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16463) CodeGenUtils generates code that has two semicolons for GroupingWindowAggsHandler in blink
hehuiyuan created FLINK-16463: - Summary: CodeGenUtils generates code that has two semicolons for GroupingWindowAggsHandler in blink Key: FLINK-16463 URL: https://issues.apache.org/jira/browse/FLINK-16463 Project: Flink Issue Type: Wish Components: Table SQL / Planner Reporter: hehuiyuan Attachments: image-2020-03-06-20-43-20-300.png, image-2020-03-06-20-44-16-446.png !image-2020-03-06-20-43-20-300.png|width=452,height=297! !image-2020-03-06-20-44-16-446.png|width=513,height=282! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15610) How to achieve the udf that the number of return column is uncertain
hehuiyuan created FLINK-15610: - Summary: How to achieve the udf that the number of return column is uncertain Key: FLINK-15610 URL: https://issues.apache.org/jira/browse/FLINK-15610 Project: Flink Issue Type: Wish Components: Table SQL / API Reporter: hehuiyuan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15596) Support key-value messages for kafka producer for flink SQL \Tbale
hehuiyuan created FLINK-15596: - Summary: Support key-value messages for kafka producer for flink SQL \Tbale Key: FLINK-15596 URL: https://issues.apache.org/jira/browse/FLINK-15596 Project: Flink Issue Type: Wish Components: Connectors / Kafka Reporter: hehuiyuan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15404) How to insert hive table for different catalog
hehuiyuan created FLINK-15404: - Summary: How to insert hive table for different catalog Key: FLINK-15404 URL: https://issues.apache.org/jira/browse/FLINK-15404 Project: Flink Issue Type: Wish Components: Table SQL / Planner Reporter: hehuiyuan I have a hive catalog : {code:java} catalog name : myhive database : default {code} and the flink has a default catalog : {code:java} catalog name : default_catalog database : default_database {code} For example : I have a source table 'source_table' that's from kafka which is register to default_catalog, I want to insert hive table 'hive_table' that is from myhive catalog. SQL: insert into hive_table select * from source_table; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15326) Add document description for DECIMAL(38, 18) when the sink table uses json schema
hehuiyuan created FLINK-15326: - Summary: Add document description for DECIMAL(38, 18) when the sink table uses json schema Key: FLINK-15326 URL: https://issues.apache.org/jira/browse/FLINK-15326 Project: Flink Issue Type: Wish Reporter: hehuiyuan Env : Flink 1.9.1 table-planner-blink Question: If i have a kafka sink table with json schema: {code:java} String jsonSchema = "{ type:'object', properties:{ name: { type: 'string' }, age: { type: 'integer' }, sex: { type: 'string' } } }"; JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(jsonSchema.toString()); TypeInformation fieldTypes = deserializationSchema.getProducedType(); Kafka kafka = new Kafka... Schema schema = new Schema.. tableEnvironment.connect(kafka) .withFormat( new Json().jsonSchema(jsonSchema)) .withSchema( schema ) .inAppendMode() .registerTableSink("sink_table") ; String sinksql = "insert into sink_example2 select * from table2" tableEnvironment.sqlUpdate(sinksql); {code} Error: {code:java} Query result schema: [name: String, age: BigDecimal, sex: String] TableSink schema:[name: String, age: BigDecimal, sex: String] {code} The table `table2` : table schema {code:java} [2019-12-19 18:10:16,937] INFO t2: root |-- name: STRING |-- age: DECIMAL(10, 0) |-- sex: STRING {code} When i use kafka to read data for json schema , i understand the integer type in json is mapped DECIMAL(38, 18) in flink table {code:java} |-- name: STRING |-- age: DECIMAL(38, 18) |-- sex: STRING {code} That's why i know to set decimal precision {code:java} String sinksql = "insert into sink_example2 select name CAST(age as decimal(38,18) ) as age, sex from table2" {code} JSON format needs to pay attention to this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15159) the string of json is mapped to VARCHAR or STRING?
hehuiyuan created FLINK-15159: - Summary: the string of json is mapped to VARCHAR or STRING? Key: FLINK-15159 URL: https://issues.apache.org/jira/browse/FLINK-15159 Project: Flink Issue Type: Wish Components: Documentation Reporter: hehuiyuan Attachments: image-2019-12-09-21-14-08-183.png !image-2019-12-09-21-14-08-183.png|width=356,height=180! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15158) Why convert integer to bigdecimal for formart-json when kafka is used
hehuiyuan created FLINK-15158: - Summary: Why convert integer to bigdecimal for formart-json when kafka is used Key: FLINK-15158 URL: https://issues.apache.org/jira/browse/FLINK-15158 Project: Flink Issue Type: Wish Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: hehuiyuan For example , I have a table `table1` : root |-- name: STRING |-- age: INT |-- sex: STRING then , I want to `insert into kafka select * form table1` : jsonschame: {type:'object',properties:\{name: { type: 'string' },age: \{ type: 'integer' },sex: \{ type: 'string' }}} ``` descriptor.withFormat(new Json().jsonSchema(jsonSchema)).withSchema(schema); ``` Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [sink_example2] do not match.Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [sink_example2] do not match. *Query result schema: [name: String, age: Integer, sex: String]* *TableSink schema: [name: String, age: BigDecimal, sex: String]* at org.apache.flink.table.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:65) at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:156) at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:155) at scala.Option.map(Option.scala:146) I know that the type of integer in the jsonschema is convert to BigDecimal .But for the above scenario, does this have to be forced to be decimal? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15146) The value of `cleanupSize` should be grater than 0 for `IncrementalCleanupStrategy`
hehuiyuan created FLINK-15146: - Summary: The value of `cleanupSize` should be grater than 0 for `IncrementalCleanupStrategy` Key: FLINK-15146 URL: https://issues.apache.org/jira/browse/FLINK-15146 Project: Flink Issue Type: Wish Reporter: hehuiyuan Attachments: image-2019-12-09-17-03-59-014.png, image-2019-12-09-17-09-18-062.png !image-2019-12-09-17-03-59-014.png|width=615,height=108! !image-2019-12-09-17-09-18-062.png|width=491,height=309! Hi , the value of cleanupSize is grater than or equal 0. Whether that the value is grater than 0 is more practical. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14639) Fix the document of Metircs that has an error for `User Scope`
hehuiyuan created FLINK-14639: - Summary: Fix the document of Metircs that has an error for `User Scope` Key: FLINK-14639 URL: https://issues.apache.org/jira/browse/FLINK-14639 Project: Flink Issue Type: Wish Components: Documentation Environment: Hi , i think it should be `MetricGroup{{#addGroup(String key, String value)}}` Reporter: hehuiyuan Attachments: image-2019-11-07-10-42-37-862.png !image-2019-11-07-10-42-37-862.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14550) can't use proctime attribute when register datastream for table and exist nested fields
hehuiyuan created FLINK-14550: - Summary: can't use proctime attribute when register datastream for table and exist nested fields Key: FLINK-14550 URL: https://issues.apache.org/jira/browse/FLINK-14550 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: hehuiyuan *_The data schame :_* final String schemaString = "{\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\"," + "\"fields\": [\{\"name\":\"name\",\"type\":\"string\"},\{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}," + "\{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},\{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}" + ",\{\"name\":\"type_double_test\",\"type\":\"double\"},\{\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]}," + "\{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":" + "\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\"," + "\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\"," + "\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," + "\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\"," + "\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"Fixed16\"," + "\"size\":16}],\"size\":16},\{\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]}," + *"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[{\"name\":\"num\"," +* *"\"type\":\"int\"},\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"}," +* *"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},*{\"name\":\"type_bytes\"," + "\"type\":\"bytes\"},\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," + "\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," + "\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," + "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," + "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," + "\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," + "\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}"; *_The code :_* tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime"); _*The error is as follows:*_ Exception in thread "main" org.apache.flink.table.api.TableException: The proctime attribute can only be appended to the table schema and not replace an existing field. Please move 'userActionTime' to the end of the schema.Exception in thread "main" org.apache.flink.table.api.TableException: The proctime attribute can only be appended to the table schema and not replace an existing field. Please move 'userActionTime' to the end of the schema. at org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:668) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:549) at org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:136) at com.jd.flink.sql.demo.validate.schema.avro.AvroQuickStartMain.main(AvroQuickStartMain.java:145) The code is ok. tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street"); The code is ok. tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested,userActionTime.proctime"); -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-13810) Update `Elasticsearch Connector` label
hehuiyuan created FLINK-13810: - Summary: Update `Elasticsearch Connector` label Key: FLINK-13810 URL: https://issues.apache.org/jira/browse/FLINK-13810 Project: Flink Issue Type: Wish Components: Documentation Reporter: hehuiyuan Sink: Streaming Append Mode Sink: Streaming Upsert Mode Format: JSON-only The first table should be Source! -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13757) Document error for `logical functions`
hehuiyuan created FLINK-13757: - Summary: Document error for `logical functions` Key: FLINK-13757 URL: https://issues.apache.org/jira/browse/FLINK-13757 Project: Flink Issue Type: Wish Components: Documentation Reporter: hehuiyuan Attachments: image-2019-08-17-11-58-53-247.png [https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html#logical-functions] False: |{{boolean IS NOT TRUE}}|Returns TRUE if _boolean_ is FALSE or UNKNOWN; returns FALSE if _boolean_ is FALSE.| True: |{{boolean IS NOT TRUE}}|Returns TRUE if _boolean_ is FALSE or UNKNOWN; returns FALSE if _boolean_ is TURE.| [!image-2019-08-17-11-58-53-247.png!|https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html#logical-functions] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13756) Modify Code Annotations for findAndCreateTableSource in TableFactoryUtil
hehuiyuan created FLINK-13756: - Summary: Modify Code Annotations for findAndCreateTableSource in TableFactoryUtil Key: FLINK-13756 URL: https://issues.apache.org/jira/browse/FLINK-13756 Project: Flink Issue Type: Wish Components: Table SQL / API Reporter: hehuiyuan /** * Returns a *table sink* matching the \{@link org.apache.flink.table.catalog.CatalogTable}. */ public static TableSource findAndCreateTableSource(CatalogTable table) { return findAndCreateTableSource(table.toProperties()); } Hi , this method `findAndCreateTableSource` is used for returning `TableSource` , but the annotation is *` Returns a table sink`* -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13183) Add PrintTableSink for Table & SQL APi
hehuiyuan created FLINK-13183: - Summary: Add PrintTableSink for Table & SQL APi Key: FLINK-13183 URL: https://issues.apache.org/jira/browse/FLINK-13183 Project: Flink Issue Type: Wish Components: Table SQL / API Reporter: hehuiyuan -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13181) Add a constructor function to CsvTableSink
hehuiyuan created FLINK-13181: - Summary: Add a constructor function to CsvTableSink Key: FLINK-13181 URL: https://issues.apache.org/jira/browse/FLINK-13181 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: hehuiyuan Add a constructor function for parameters : @param path The output path to write the Table to. @param fieldDelim The field delimiter @param writeMode The write mode to specify whether existing files are overwritten or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13180) Add a constructor function to CsvTableSink
hehuiyuan created FLINK-13180: - Summary: Add a constructor function to CsvTableSink Key: FLINK-13180 URL: https://issues.apache.org/jira/browse/FLINK-13180 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: hehuiyuan Add a constructor function to CsvTableSink. For parameter : @param path The output path to write the Table to. @param fieldDelim The field delimiter @param writeMode The write mode to specify whether existing files are overwritten or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)