[jira] [Created] (FLINK-33871) Reduce getTable call for hive client and optimize graph generation time

2023-12-18 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-33871:
-

 Summary: Reduce getTable call for hive client and optimize graph 
generation time
 Key: FLINK-33871
 URL: https://issues.apache.org/jira/browse/FLINK-33871
 Project: Flink
  Issue Type: Improvement
Reporter: hehuiyuan


HiveCatalog.getHiveTable method wastes a lot of time when generate graph, 
because the number of calls  is relatively high.


I have an sql task with over 2000 rows,  the HiveCatalog.getHiveTable  method 
is called 4879 times , but only six hive tables were used. 

![image](https://github.com/apache/flink/assets/18002496/d5f0daf3-f80a-4790-ae21-4e75dff9cfd7)

The client.getTable method costs a lot of time.  

![image](https://github.com/apache/flink/assets/18002496/be0d176f-3915-4b92-a177-f1cfaf6d2927)
There is a statistic that jobmanager interacts with hive when generate graph.

If One call takes approximately 50 milliseconds ,
How much time it spends  : 4879 * 50 =243950ms  = 243.95s  = 4min

We can cache and  client.getTable method  is only  called six times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31086) [DOC]update connector lable for blackhole and kafka

2023-02-15 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-31086:
-

 Summary: [DOC]update connector lable for blackhole and kafka
 Key: FLINK-31086
 URL: https://issues.apache.org/jira/browse/FLINK-31086
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: hehuiyuan


pdate connector label for kafka and blackhole.

Blackhole: sink:bounded unbounded

Kafka: source bounded

!https://user-images.githubusercontent.com/18002496/216600374-5c9d16db-66ac-42a4-8b21-16245a71f9ef.png|width=468,height=168!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30679) Can not load the data of hive dim table when project-push-down is introduced

2023-01-13 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-30679:
-

 Summary: Can not load the data of hive dim table when 
project-push-down is introduced
 Key: FLINK-30679
 URL: https://issues.apache.org/jira/browse/FLINK-30679
 Project: Flink
  Issue Type: Bug
Reporter: hehuiyuan


vectorize read:

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at LookupFunction$26.flatMap(Unknown Source) ~[?:?] {code}
 

 

mapreduce read:

 
{code:java}
java.lang.ArrayIndexOutOfBoundsException: 3
    at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
~[?:1.8.0_301]
    at 
java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
 ~[?:1.8.0_301]
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
~[?:1.8.0_301]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
~[?:1.8.0_301]
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
~[?:1.8.0_301]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) 
~[?:1.8.0_301]
    at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
 ~[?:1.8.0_301]
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) 
~[?:1.8.0_301]
    at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:141)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)
 ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
    at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
    at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
 ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] {code}
 

 

The sql :

 
{code:java}
CREATE TABLE kafkaTableSource (
name string,
age int,
sex string,
address string,
ptime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'hehuiyuan1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.client.id' = 'test-consumer-group',
'properties.group.id' = 'test-consumer-group',
'format' = 'csv'
);

CREATE TABLE printsink (
name string,
age int,
sex string,
address string,
score bigint,
dt string
) WITH (
'connector' = 'print'
);

CREATE CATALOG myhive
WITH (
'type' = 'hive',
'default-database' = 'hhy',
'hive-version' = '2.0.0',
'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop'
);

USE CATALOG myhive;
USE hhy;

set table.sql-dialect=hive;
CREATE TABLE IF NOT EXISTS tmp_flink_test_text (
name STRING,
age INT,
score BIGINT
) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES (
'streaming-source.enable' = 'false',
'streaming-source.partition.include' = 'all',
'lookup.join.cache.ttl' = '5 min'
);
set table.sql-dialect=default;

USE CATALOG default_catalog;
INSERT INTO default_catalog.default_database.printsink
SELECT s.name, s.age, s.sex, s.address, r.score, r.dt
FROM default_catalog.default_database.kafkaTableSource  as s
JOIN 

[jira] [Created] (FLINK-29553) Support UNIX_TIMESTAMP in Table API

2022-10-09 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-29553:
-

 Summary: Support UNIX_TIMESTAMP in Table API 
 Key: FLINK-29553
 URL: https://issues.apache.org/jira/browse/FLINK-29553
 Project: Flink
  Issue Type: Improvement
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29052) Support TO_TIMESTAMP built-in function in Table API

2022-08-21 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-29052:
-

 Summary: Support TO_TIMESTAMP  built-in function in Table API
 Key: FLINK-29052
 URL: https://issues.apache.org/jira/browse/FLINK-29052
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28750) Whether to add comment for hive table

2022-07-30 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-28750:
-

 Summary: Whether to add comment for hive table
 Key: FLINK-28750
 URL: https://issues.apache.org/jira/browse/FLINK-28750
 Project: Flink
  Issue Type: Improvement
Reporter: hehuiyuan


Currently,  I have a hive ddl,as follows
{code:java}
"set table.sql-dialect=hive;\n" +
"CREATE TABLE IF NOT EXISTS myhive.dev.shipu3_test_1125 (\n" +
"   `id` int COMMENT 'ia',\n" +
"   `cartdid` bigint COMMENT 'aaa',\n" +
"   `customer` string COMMENT '',\n" +
"   `product` string COMMENT '',\n" +
"   `price` double COMMENT '',\n" +
"   `dt` STRING COMMENT ''\n" +
") PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES (\n" +
"  'streaming-source.enable' = 'false',\n" +
"  'streaming-source.partition.include' = 'all',\n" +
"  'lookup.join.cache.ttl' = '12 h'\n" +
")"; {code}
It is parsed as SqlCreateHiveTable by hive dialect parser. But the field commet 
is lost.

!image-2022-07-30-15-21-58-062.png|width=568,height=283!

 

 

 

 

 

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28716) uploading multiple files/form datas fail randomly when use rest api

2022-07-27 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-28716:
-

 Summary:  uploading multiple files/form datas fail randomly when 
use rest api
 Key: FLINK-28716
 URL: https://issues.apache.org/jira/browse/FLINK-28716
 Project: Flink
  Issue Type: Bug
Reporter: hehuiyuan


It can happen error randomly when use `jars/upload` rest api.
{code:java}
java.lang.IndexOutOfBoundsException: index: 1804, length: 1 (expected: range(0, 
1804))
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkRangeBounds(AbstractByteBuf.java:1390)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkIndex0(AbstractByteBuf.java:1397)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1384)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1379)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.getByte(AbstractByteBuf.java:355)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostBodyUtil.findDelimiter(HttpPostBodyUtil.java:238)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.loadDataMultipartOptimized(HttpPostMultipartRequestDecoder.java:1172)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.getFileUpload(HttpPostMultipartRequestDecoder.java:926)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.decodeMultipart(HttpPostMultipartRequestDecoder.java:572)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.findMultipartDisposition(HttpPostMultipartRequestDecoder.java:797)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.decodeMultipart(HttpPostMultipartRequestDecoder.java:511)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.findMultipartDelimiter(HttpPostMultipartRequestDecoder.java:663)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.decodeMultipart(HttpPostMultipartRequestDecoder.java:498)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.parseBodyMultipart(HttpPostMultipartRequestDecoder.java:463)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.parseBody(HttpPostMultipartRequestDecoder.java:432)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:347)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:54)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.offer(HttpPostRequestDecoder.java:223)
    at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:176)
    at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:71)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28375) Whether to consider adding other data type to support for last_value function

2022-07-04 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-28375:
-

 Summary: Whether to consider adding other data type to support for 
last_value function
 Key: FLINK-28375
 URL: https://issues.apache.org/jira/browse/FLINK-28375
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: hehuiyuan
 Attachments: image-2022-07-04-16-20-08-661.png, 
image-2022-07-04-16-21-28-198.png

 
{code:java}
CREATE TABLE jmqTableSource ( 
 keyField INTEGER,
 timestampField INTEGER, 
 arrayField ARRAY, 
 proc as PROCTIME())
WITH (    
  'connector' = 'kafka', 
);
insert into kafkaTableSink
select keyField, last_value(arrayField) over (partition by keyField order by 
proc) from kafkaTableSource; {code}
Exception in thread "main" org.apache.flink.table.api.TableException: 
LAST_VALUE aggregate function does not support type: ''ARRAY''.
Please re-check the data type.

 

I have a  modification to support this, but why does the community not support 
it? 

Is there any special reason that i do not considered?

 

The test the array data type can run:

mock data:

!image-2022-07-04-16-21-28-198.png!

result:

!image-2022-07-04-16-20-08-661.png|width=626,height=146!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27238) The HiveGenericUDTF should support primitive array,for example Array Array ...

2022-04-13 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-27238:
-

 Summary: The HiveGenericUDTF should support primitive array,for 
example Array Array ...
 Key: FLINK-27238
 URL: https://issues.apache.org/jira/browse/FLINK-27238
 Project: Flink
  Issue Type: Bug
Reporter: hehuiyuan
 Attachments: image-2022-04-14-10-27-50-340.png

!image-2022-04-14-10-27-50-340.png|width=381,height=260!

 

If argTypes[0] is Array ,it will throw exception.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27194) Whether to consider adding the configuration of ignore when data deserialization failed.

2022-04-12 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-27194:
-

 Summary: Whether to consider adding the configuration of ignore 
when data deserialization failed.
 Key: FLINK-27194
 URL: https://issues.apache.org/jira/browse/FLINK-27194
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26848) JDBC don't flush data when disable flush-max-rows and flush-interval

2022-03-24 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-26848:
-

 Summary: JDBC don't flush data when disable flush-max-rows and 
flush-interval
 Key: FLINK-26848
 URL: https://issues.apache.org/jira/browse/FLINK-26848
 Project: Flink
  Issue Type: Bug
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26565) Use lateTrigger when the window maxtimestap of data is less than currentwatermark and it is not discarded because the allow latency parameter

2022-03-09 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-26565:
-

 Summary: Use lateTrigger when the window maxtimestap of data is 
less than currentwatermark and  it is not discarded because the allow latency 
parameter 
 Key: FLINK-26565
 URL: https://issues.apache.org/jira/browse/FLINK-26565
 Project: Flink
  Issue Type: Improvement
Reporter: hehuiyuan
 Attachments: image-2022-03-10-11-27-52-891.png

Use lateTrigger when the window maxtimestap of data is less than 
currentwatermark and  it is not discarded because the allow latency parameter.

!image-2022-03-10-11-27-52-891.png|width=543,height=318!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26498) The window result may not have been emitted when use window emit feature and use lateTrigger.

2022-03-05 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-26498:
-

 Summary: The window result may not have been  emitted when use 
window emit feature and use lateTrigger. 
 Key: FLINK-26498
 URL: https://issues.apache.org/jira/browse/FLINK-26498
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: hehuiyuan
 Attachments: image-2022-03-05-23-53-37-086.png, 
image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png

the sql of job :

 
{code:java}
CREATE TABLE tableSource(
name string,
age int not null,
sex string,
dt TIMESTAMP(3),
WATERMARK FOR dt AS dt - INTERVAL '0' SECOND
) WITH (

);



CREATE TABLE tableSink(
windowstart timestamp(3),
windowend timestamp(3),
name string,
age int,
cou bigint
)
WITH (

);
INSERT INTO tablesink
  SELECT
TUMBLE_START(dt, INTERVAL '1' HOUR),
TUMBLE_END(dt, INTERVAL '1' HOUR),
name,
age,
count(sex)
FROM tableSource
GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code}
 and table config:

 

 
{code:java}
table.exec.emit.allow-lateness = 1 hour 
table.exec.emit.late-fire.delay = 1 min
table.exec.emit.early-fire.delay = 1min{code}
The data:

 

 
{code:java}
>hehuiyuan1,22,woman,2022-03-05 00:30:22.000
>hehuiyuan1,22,woman,2022-03-05 00:40:22.000
 //pause ,wait for the window trigger for earlyTrigger 1 min
>hehuiyuan1,22,woman,2022-03-05 00:50:22.000
>hehuiyuan1,22,woman,2022-03-05 00:56:22.000
//pause ,wait for the window trigger for earlyTrigger 1 min 
>hehuiyuan1,22,woman,2022-03-05 01:00:00.000
//pause ,wait for the window trigger for earlyTrigger 1 min 
>hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data
//pause ,wait for the window trigger for earlyTrigger 1 min 
>hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data 
>>hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for 
>[0:00:00 1:00:00]
>hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code}
The result:
{code:java}
> +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) 
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 

> +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
 
 
> +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
 

`hehuiyuan1,22,woman,2022-03-05 00:59:20.100` is lost, the lateTrigger is not 
trigger and the window[0:00:00 ,1:00:00] is cleaned when the data 
`hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived that updated watermark.

 

The window[0:00:00 ,1:00:00]   has 6 pieces of data, but we got 5.

The trigger is AfterEndOfWindowEarlyAndLate .

So WindowOpearator may need to emit reuslt when the window cleanupTimer call 
onEventTime.

I think the correct result is as follows:

 

 
{code:java}
> +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])

> +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])

> -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6])
> +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26320) Update hive doc for 1 m->1 min

2022-02-22 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-26320:
-

 Summary: Update hive doc for 1 m->1 min
 Key: FLINK-26320
 URL: https://issues.apache.org/jira/browse/FLINK-26320
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: hehuiyuan
 Attachments: image-2022-02-23-15-36-54-649.png

{{Time interval unit label 'm' does not match any of the recognized units: 
DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | 
minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | 
milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | 
micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | 
nanosecond | nanoseconds)}}

‘1 m’ is misleading when we used, which is not correct.

 

!image-2022-02-23-15-36-54-649.png|width=491,height=219!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25262) Support to send data to lookup table for KeyGroupStreamPartitioner way for SQL

2021-12-11 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-25262:
-

 Summary: Support to send data to  lookup table for 
KeyGroupStreamPartitioner way for SQL
 Key: FLINK-25262
 URL: https://issues.apache.org/jira/browse/FLINK-25262
 Project: Flink
  Issue Type: Improvement
Reporter: hehuiyuan
 Attachments: image-2021-12-12-15-15-48-540.png, 
image-2021-12-12-15-18-08-574.png

Send data to lookup table  by hash , which  can improve cache hit rate, futher 
improve processing performance and reduce the size of cache.

Shoulder we consider to introducing it?

 

!image-2021-12-12-15-18-08-574.png|width=419,height=193!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24620) Add partition discovery for kafka sink when `sink.partitioner`= 'fix'

2021-10-22 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-24620:
-

 Summary: Add partition discovery for kafka sink when 
`sink.partitioner`= 'fix'
 Key: FLINK-24620
 URL: https://issues.apache.org/jira/browse/FLINK-24620
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: hehuiyuan


Add partition discovery for kafka sink when `sink.partitioner`= 'fix'



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23810) Print sql when parse failed , which is convenient to find error

2021-08-16 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-23810:
-

 Summary: Print  sql when parse failed  , which is convenient to 
find error 
 Key: FLINK-23810
 URL: https://issues.apache.org/jira/browse/FLINK-23810
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: hehuiyuan


Print sql when parse failed , which is convenient to find error sql.

 
{code:java}

public SqlNode parse(String sql) {
try {
SqlParser parser = SqlParser.create(sql, config);
return parser.parseStmt();
} catch (SqlParseException e) {
throw new SqlParserException("SQL parse failed. " + e.getMessage(), e);
}
}
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23604) 'csv.disable-quote-character' can not take effect during deserialization for old csv format

2021-08-03 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-23604:
-

 Summary: 'csv.disable-quote-character' can not take effect during 
deserialization for old csv format
 Key: FLINK-23604
 URL: https://issues.apache.org/jira/browse/FLINK-23604
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: hehuiyuan


https://issues.apache.org/jira/browse/FLINK-21207

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23237) Add log to print data that failed to deserialize when ignore-parse-error=true and threre is NPE question when use `new String(message) ` if message = null

2021-07-04 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-23237:
-

 Summary: Add log to print data that failed to deserialize when  
ignore-parse-error=true and threre is NPE question when use `new 
String(message) ` if message = null  
 Key: FLINK-23237
 URL: https://issues.apache.org/jira/browse/FLINK-23237
 Project: Flink
  Issue Type: Improvement
Reporter: hehuiyuan


(1)Add log to print error data that failed to deserialize when set 
`ignore-parse-error` = `true`

(2)Threre is NPE question when use `new String(message) `  if message is null.
{code:java}
public RowData deserialize(@Nullable byte[] message) throws IOException {
if (message == null) {
return null;
}
try {
final JsonNode root = objectReader.readValue(message);
return (RowData) runtimeConverter.convert(root);
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
}
throw new IOException(
String.format("Failed to deserialize CSV row '%s'.", new 
String(message)), t);
}
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22976) Whether to consider adding config-option to control whether to exclude record.key value from record.value value

2021-06-11 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-22976:
-

 Summary: Whether to consider adding config-option to control  
whether to exclude record.key value from  record.value value 
 Key: FLINK-22976
 URL: https://issues.apache.org/jira/browse/FLINK-22976
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: hehuiyuan


upsert-kafka:

key :

{"name":"hehui111","sname":"wman"}

 

value :

{"name":"hehui111","sname":"wman","sno":"wman","sclass":"wman","address":"wman"}

 

The value of ProduceRecord' value contain the value of  ProduceRecord' key.

Whether to consider adding config-option to control whether to exclude 
record.key value from record.value value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22954) Don't support consuming update and delete changes when use table function that does not contain table field

2021-06-09 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-22954:
-

 Summary: Don't support consuming update and delete changes when 
use table function that does not contain table field
 Key: FLINK-22954
 URL: https://issues.apache.org/jira/browse/FLINK-22954
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: hehuiyuan


{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: Table 
sink 'default_catalog.default_database.kafkaTableSink' doesn't support 
consuming update and delete changes which is produced by node 
Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread 
"main" org.apache.flink.table.api.TableException: Table sink 
'default_catalog.default_database.kafkaTableSink' doesn't support consuming 
update and delete changes which is produced by node 
Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:125)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
 at 

[jira] [Created] (FLINK-22617) Add log when create bulk format

2021-05-10 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-22617:
-

 Summary: Add log when create bulk format 
 Key: FLINK-22617
 URL: https://issues.apache.org/jira/browse/FLINK-22617
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: hehuiyuan


 Hive table sink  has some log that tells us whether to use native or mapred .

 

 
{code:java}
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
LOG.info("Hive streaming sink: Use native parquet writer.");
LOG.info(
 "Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter 
Factory not available.");
{code}
 

I have some ideas we can add log  to make it more  obvious when read hive for 
`createBulkFormatForSplit`.

 

!image-2021-05-10-17-04-15-571.png|width=490,height=198!  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22400) NPE problem when call HiveInspectors.toFlinkObject for hive-exec 2.0.0

2021-04-21 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-22400:
-

 Summary: NPE problem when call HiveInspectors.toFlinkObject for 
hive-exec 2.0.0
 Key: FLINK-22400
 URL: https://issues.apache.org/jira/browse/FLINK-22400
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: hehuiyuan
 Attachments: image-2021-04-21-23-18-56-107.png

ENV:

flink 1.12   hive 2.0.0

ERROR LOG:
{code:java}
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:399)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:620)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:584)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: 
java.lang.NullPointerException
at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.nextRecord(HiveMapredSplitReader.java:190)
at 
org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter$HiveReader.nextRecord(HiveBulkFormatAdapter.java:336)
at 
org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter$HiveReader.readBatch(HiveBulkFormatAdapter.java:319)
at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
... 6 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.table.functions.hive.conversion.HiveInspectors.toFlinkObject(HiveInspectors.java:335)
at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.nextRecord(HiveMapredSplitReader.java:180)
... 11 more

{code}
{code:java}
Map map = mapInspector.getMap(data);

hive-exec 2.0.0

{code}
 

!image-2021-04-21-23-18-56-107.png|width=372,height=191!

 

Null may be returned here,then throw NPE when  call `map.size`

(map == null)

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22263) Using TIMESTAMPADD function with partition value has some problem when push partition into TableSource

2021-04-13 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-22263:
-

 Summary: Using TIMESTAMPADD function with partition value has some 
problem  when push partition into  TableSource
 Key: FLINK-22263
 URL: https://issues.apache.org/jira/browse/FLINK-22263
 Project: Flink
  Issue Type: Bug
Reporter: hehuiyuan


SQL (table api):
{code:java}
CREATE CATALOG myhive
WITH (
'type' = 'hive',
'default-database' = 'hhy'
);

INSERT INTO  default_catalog.default_database.table_sink select * from  
myhive.hhy.tmp_flink_test where dt=CAST(TIMESTAMPADD(DAY, -1, CURRENT_DATE) as 
varchar);

{code}
 

Error log:
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Data 
type 'INTERVAL SECOND(3) NOT NULL' with conversion class 'java.time.Duration' 
does not support a value literal of class 'java.math.BigDecimal'.Exception in 
thread "main" org.apache.flink.table.api.ValidationException: Data type 
'INTERVAL SECOND(3) NOT NULL' with conversion class 'java.time.Duration' does 
not support a value literal of class 'java.math.BigDecimal'. at 
org.apache.flink.table.expressions.ValueLiteralExpression.validateValueDataType(ValueLiteralExpression.java:286)
 at 
org.apache.flink.table.expressions.ValueLiteralExpression.(ValueLiteralExpression.java:79)
 at 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral(ApiExpressionUtils.java:251)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:432)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitLiteral(RexNodeExtractor.scala:340)
 at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1173) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:340)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter$$anonfun$7.apply(RexNodeExtractor.scala:440)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter.visitCall(RexNodeExtractor.scala:439)
 at 

[jira] [Created] (FLINK-22174) `csv.quote-character` does not work for csv format when sink

2021-04-09 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-22174:
-

 Summary: `csv.quote-character`  does not  work for  csv format  
when sink
 Key: FLINK-22174
 URL: https://issues.apache.org/jira/browse/FLINK-22174
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: hehuiyuan


{code:java}
CREATE TABLE kafkaTableSource (
name string,
age int,
sex string,
address string
) WITH (
'connector' = 'kafka',
'topic' = 'hehuiyuan1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.client.id' = 'test-consumer-group',
'properties.group.id' = 'test-consumer-group',
'format' = 'csv',
'csv.quote-character' = '*'
);

CREATE TABLE kafkaTableSink (
name string,
age int,
sex string,
address string
) WITH (
'connector' = 'kafka',
'topic' = 'hehuiyuan2',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'csv',
'csv.quote-character' = '#'
);

insert into kafkaTableSink select * from kafkaTableSource;

{code}
 

Test 1 :

Generate data for topic `hehuiyuan1`
{code:java}
>hehuiyuan,22,man,hengshui
>*hehuiyuan*,*22*,*man*,*hengshui*
>#hehuiyuan#,22,#man#,#hengshui#
>hehuiyuan,22,#man#,#hengshui#
>"hehuiyuan",22,#man#,#hengshui#
{code}
 

Result data for topic `hehuiyuan2`
{code:java}
>hehuiyuan,22,man,hengshui
>hehuiyuan,22,man,hengshui
>###hehuiyuan###,22,###man###,###hengshui###
>hehuiyuan,22,###man###,###hengshui###
>#"hehuiyuan"#,22,###man###,###hengshui###
{code}
I think the result shoule be ` #hehuiyuan#,22,#man#,#hengshui# ` for the first 
and second data.

 

Test 2:

Add Timestamp field:
{code:java}
CREATE TABLE kafkaTableSource (
name string,
age int,
sex string,
address string,
dt timestamp(3)
) WITH (
{code}
Generate data for topic `hehuiyuan1`
{code:java}
>hehuiyuan,22,man,hengshui,2020-12-12 12:12:12
>hehuiyuan,22,man,hengshui,2020-12-16 10:00:00{code}
Result data for topic `hehuiyuan2`
{code:java}
>hehuiyuan,22,man,hengshui,#2020-12-12 12:12:12#
>hehuiyuan,22,man,hengshui,#2020-12-16 10:00:00#
{code}
 

 

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21930) There are some error for hive_read_writer.md

2021-03-23 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-21930:
-

 Summary: There are some error for hive_read_writer.md
 Key: FLINK-21930
 URL: https://issues.apache.org/jira/browse/FLINK-21930
 Project: Flink
  Issue Type: Wish
  Components: Documentation
Reporter: hehuiyuan


!https://user-images.githubusercontent.com/18002496/111795099-db2c0580-8901-11eb-929d-dd40c179a948.png|width=406,height=194!

 

 
h1. fix hive dim doc replace order to o



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21829) create HiveCatalog with custom hadoopconfdir first

2021-03-16 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-21829:
-

 Summary: create HiveCatalog with custom hadoopconfdir first
 Key: FLINK-21829
 URL: https://issues.apache.org/jira/browse/FLINK-21829
 Project: Flink
  Issue Type: Wish
  Components: Connectors / Hive
Reporter: hehuiyuan


here is no prompt when the the path to hadoop conf configured is wrong 
unintentional.
{code:java}
private static HiveConf createHiveConf(
@Nullable String hiveConfDir, @Nullable String hadoopConfDir) {
// create HiveConf from hadoop configuration with hadoop conf directory 
configured.
Configuration hadoopConf = null;
if (isNullOrWhitespaceOnly(hadoopConfDir)) {
for (String possibleHadoopConfPath :
HadoopUtils.possibleHadoopConfPaths(
new org.apache.flink.configuration.Configuration())) {
hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
if (hadoopConf != null) {
break;
}
}
} else {
hadoopConf = getHadoopConfiguration(hadoopConfDir);
}
if (hadoopConf == null) {
hadoopConf = new Configuration();
}
HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);

{code}

It is better to load hadoop conf from possiable hadoop path when the path is 
wrong.

(1) try to load from the custom hadoop conf path
(2) try to load from possiable hadoop conf path if {{Configuration hadoopConf}} 
is null.
(3) new Configuration if {{Configuration hadoopConf}} is null
{code:java}

private static HiveConf createHiveConf(
@Nullable String hiveConfDir, @Nullable String hadoopConfDir) {
// create HiveConf from hadoop configuration with hadoop conf directory 
configured.
Configuration hadoopConf = null;
if (!isNullOrWhitespaceOnly(hadoopConfDir)) {
hadoopConf = getHadoopConfiguration(hadoopConfDir);
}
if (hadoopConf == null) {
for (String possibleHadoopConfPath :
HadoopUtils.possibleHadoopConfPaths(
new org.apache.flink.configuration.Configuration())) {
hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
if (hadoopConf != null) {
break;
}
}
}
if (hadoopConf == null) {
hadoopConf = new Configuration();
}
HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21284) Non-deterministic UDF functions return different values

2021-02-04 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-21284:
-

 Summary: Non-deterministic UDF functions return different values
 Key: FLINK-21284
 URL: https://issues.apache.org/jira/browse/FLINK-21284
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: hehuiyuan


Non-deterministic UDF functions is used mutiple times , the result is different.

 
{code:java}
tableEnv.registerFunction("sample", new SampleFunction());

Table tm = tableEnv.sqlQuery("select name, RAND_INTEGER(10) as sample , sex 
from myhive_staff");
tableEnv.registerTable("tmp", tm);


tableEnv.sqlUpdate("insert into sinktable select * from tmp  where sample >= 
8");




// UDF函数
public class SampleFunction extends ScalarFunction {
  public int eval(int pvid) {
int a = (int) (Math.random() * 10);
System.out.println("" + a );
return a;
  }
}{code}
 

Sample udf function is used for `RAND_INTEGER(10) as sample`  when sink,

which lead  to  inconsistent result.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20771) Hive partition is not added when there is a lot of data

2020-12-25 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-20771:
-

 Summary: Hive partition is not added when there is a lot of data
 Key: FLINK-20771
 URL: https://issues.apache.org/jira/browse/FLINK-20771
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: hehuiyuan
 Attachments: image-2020-12-25-18-09-42-707.png, 
image-2020-12-25-18-15-07-519.png

Hive partition is not added when the data is huge .

!image-2020-12-25-18-09-42-707.png|width=437,height=115!

  Before partition commit, *inProgressPart* will be reinitialize .

But bucket is active , the partition is 

!image-2020-12-25-18-15-07-519.png|width=574,height=192!

bucket is active , so the  notifyBucketInactive is  not executed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20318) Fix cast question for properies() method in kafka ConnectorDescriptor

2020-11-24 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-20318:
-

 Summary: Fix cast question for properies() method in kafka 
ConnectorDescriptor
 Key: FLINK-20318
 URL: https://issues.apache.org/jira/browse/FLINK-20318
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: hehuiyuan


 

This Jira fixes Kafka connector. There is a cast problem when use properties 
method.
{code:java}
Properties props = new Properties();
 props.put( "enable.auto.commit", "false");
 props.put( "fetch.max.wait.ms", "3000");
 props.put("flink.poll-timeout", 5000);
 props.put( "flink.partition-discovery.interval-millis", false);
kafka = new Kafka()
 .version("0.11")
 .topic(topic)
 .properties(props);
{code}
{code:java}
Exception in thread "main" java.lang.ClassCastException: java.lang.Integer 
cannot be cast to java.lang.String
Exception in thread "main" java.lang.ClassCastException: java.lang.Boolean 
cannot be cast to java.lang.String
{code}
change :

- *change (String) v > String.valueOf() in Kafka.java*

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20301) Flink sql 1.10 : Legacy Decimal and decimal for Array that is not Compatible

2020-11-23 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-20301:
-

 Summary: Flink sql 1.10 : Legacy Decimal and decimal  for Array  
that is not Compatible
 Key: FLINK-20301
 URL: https://issues.apache.org/jira/browse/FLINK-20301
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: hehuiyuan
 Attachments: image-2020-11-23-23-48-02-102.png

The error log:

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
ARRAY of table field 'numbers' does not match with the 
physical type ARRAY of the 'numbers' field of the 
TableSource return type.Exception in thread "main" 
org.apache.flink.table.api.ValidationException: Type ARRAY of 
table field 'numbers' does not match with the physical type 
ARRAY of the 'numbers' field of the TableSource 
return type. at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:160)
 at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:185)
 at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:246)
 at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at 
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) 
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:228)
 at 
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:206)
 at 
org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:110)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:118)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
{code}
 

 

 

Background :

Flink SQL  --- blink ---1.10

The shema for TableSource is JSON:

 
{code:java}
 "{type:'object',properties:{age:{type:'number'},numbers: { type: 'array', 
items: { type: 'number' } },name:{type:'string'},dt:{type: 'string', format: 
'date-time'},timehour:{type: 

[jira] [Created] (FLINK-19686) Cast question for data | time

2020-10-16 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-19686:
-

 Summary: Cast question for data | time 
 Key: FLINK-19686
 URL: https://issues.apache.org/jira/browse/FLINK-19686
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / API
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18613) How to support retract & upsert sink for a TableSink ?

2020-07-16 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-18613:
-

 Summary: How to support retract & upsert sink for a TableSink ?
 Key: FLINK-18613
 URL: https://issues.apache.org/jira/browse/FLINK-18613
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / Planner
Reporter: hehuiyuan


Environment : FLink 1.9 / Blink planner

Hi , i want to ask a question :

I have a job that executes multiple sql and a TableSink class:

(1) insert into table_0 select count(*) from table1;

(2)insert into table_2 select name, sum(score) form table1 group by name;

 

The TableSink implements UpsertStreamTablesink interface.

That is ok for SQL (2), but is not suppported for SQL (1) which there are not 
keys.

 

But i want to use a TableSink which can support the upsert and retract , can 
you give me some advices?   ~ Thanks.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18339) ValidationException exception that field typeinformation in TableSchema and in TableSource return type for blink

2020-06-16 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-18339:
-

 Summary: ValidationException exception that  field typeinformation 
in TableSchema and in TableSource return type for blink
 Key: FLINK-18339
 URL: https://issues.apache.org/jira/browse/FLINK-18339
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.9.0
Reporter: hehuiyuan
 Attachments: image-2020-06-17-10-37-48-166.png, 
image-2020-06-17-10-53-08-424.png

The  type of `datatime` field   is OBJECT_ARRAY.

 

Exception:

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
LEGACY(BasicArrayTypeInfo) of table field 'datatime' does not match 
with type BasicArrayTypeInfo of the field 'datatime' of the TableSource 
return type.Exception in thread "main" 
org.apache.flink.table.api.ValidationException: Type 
LEGACY(BasicArrayTypeInfo) of table field 'datatime' does not match 
with type BasicArrayTypeInfo of the field 'datatime' of the TableSource 
return type. at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:141)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:119)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 

[jira] [Created] (FLINK-18145) Segment optimization does not work ?

2020-06-05 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-18145:
-

 Summary: Segment optimization does not work ?
 Key: FLINK-18145
 URL: https://issues.apache.org/jira/browse/FLINK-18145
 Project: Flink
  Issue Type: Wish
Reporter: hehuiyuan


DAG Segement Optimization: 
!image-2020-06-05-14-40-03-123.png|width=569,height=226!

Code:
{code:java}
  StreamExecutionEnvironment env = EnvUtil.getEnv();
env.setParallelism(1);
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env,bsSettings);

  GeneratorTableSource tableSource = new GeneratorTableSource(2, 1, 70, 0);
  tableEnv.registerTableSource("myTble",tableSource);
  Table mytable = tableEnv.scan("myTble");
  mytable.printSchema();
  tableEnv.toAppendStream(mytable,Row.class).addSink(new 
PrintSinkFunction<>()).setParallelism(2);

  Table tableproc = tableEnv.sqlQuery("SELECT key, count(rowtime_string) as 
countkey,TUMBLE_START(proctime, INTERVAL '30' SECOND) as tumblestart FROM 
myTble group by TUMBLE(proctime, INTERVAL '30' SECOND) ,key");
  tableproc.printSchema();
  tableEnv.registerTable("t4",tableproc);


  Table table = tableEnv.sqlQuery("SELECT key,count(rowtime_string) as 
countkey,TUMBLE_START(proctime,  INTERVAL '24' HOUR) as tumblestart FROM myTble 
group by TUMBLE(proctime,  INTERVAL '24' HOUR) ,key");
  table.printSchema();
  tableEnv.registerTable("t3",table);

  String[] fields = new String[]{"key","countkey","tumblestart"};
 TypeInformation[] fieldsType = new TypeInformation[3];
fieldsType[0] = Types.INT;
fieldsType[1] = Types.LONG;
  fieldsType[2] = Types.SQL_TIMESTAMP;
  PrintTableUpsertSink printTableSink = new 
PrintTableUpsertSink(fields,fieldsType,true);
tableEnv.registerTableSink("inserttable",printTableSink);
tableEnv.sqlUpdate("insert into inserttable  select key,countkey,tumblestart 
from t3");


  String[] fieldsproc = new String[]{"key","countkey","tumblestart"};
  TypeInformation[] fieldsTypeproc = new TypeInformation[3];
  fieldsTypeproc[0] = Types.INT;
  fieldsTypeproc[1] = Types.LONG;
  fieldsTypeproc[2] = Types.SQL_TIMESTAMP;
  PrintTableUpsertSink printTableSinkproc = new 
PrintTableUpsertSink(fieldsproc,fieldsTypeproc,true);
  tableEnv.registerTableSink("inserttableproc",printTableSinkproc);
  tableEnv.sqlUpdate("insert into inserttableproc  select 
key,countkey,tumblestart from t4");

{code}
I have a custom  table source , then

    (1) transform datastream to use `toAppendStream` method   , then  sink

    (2) use tumble ,then sink

    (3) use another tumbel ,then sink

but the segement optimization did't work.

*!image-2020-06-05-14-50-33-759.png|width=458,height=336!*

 

*The source is executed by 3 threads  and generate duplicate data for 3 times*

!image-2020-06-05-14-53-57-056.png|width=1216,height=204!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17117) There are an useless cast operation for sql on blink when generate code

2020-04-13 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-17117:
-

 Summary: There are an useless  cast operation for sql on blink 
when generate code
 Key: FLINK-17117
 URL: https://issues.apache.org/jira/browse/FLINK-17117
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / Planner
Reporter: hehuiyuan
 Attachments: image-2020-04-13-19-44-19-174.png

!image-2020-04-13-19-44-19-174.png|width=641,height=305!

 

This mehthod `generateOneInputStreamOperator` when OperatorCodeGenerator  
generates SourceConversion:
{code:java}
@Override
public void processElement($STREAM_RECORD $ELEMENT) throws Exception {
  $inputTypeTerm $inputTerm = ($inputTypeTerm) 
${converter(s"$ELEMENT.getValue()")};
  ${ctx.reusePerRecordCode()}
  ${ctx.reuseLocalVariableCode()}
  ${if (lazyInputUnboxingCode) "" else ctx.reuseInputUnboxingCode()}
  $processCode
}
{code}
 
{code:java}
 $inputTypeTerm $inputTerm = ($inputTypeTerm) 
${converter(s"$ELEMENT.getValue()")};
{code}
ScanUtil calls generateOneInputStreamOperator
{code:java}

val generatedOperator = 
OperatorCodeGenerator.generateOneInputStreamOperator[Any, BaseRow](
  ctx,
  convertName,
  processCode,
  outputRowType,
  converter = inputTermConverter)

//inputTermConverter
val (inputTermConverter, inputRowType) = {
  val convertFunc = CodeGenUtils.genToInternal(ctx, inputType)
  internalInType match {
case rt: RowType => (convertFunc, rt)
case _ => ((record: String) => s"$GENERIC_ROW.of(${convertFunc(record)})",
RowType.of(internalInType))
  }
}

{code}
CodeGenUtils.scala  :  genToInternal
{code:java}
def genToInternal(ctx: CodeGeneratorContext, t: DataType): String => String = {
  val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(t))
  if (isConverterIdentity(t)) {
term => s"($iTerm) $term"
  } else {
val eTerm = boxedTypeTermForExternalType(t)
val converter = ctx.addReusableObject(
  DataFormatConverters.getConverterForDataType(t),
  "converter")
term => s"($iTerm) $converter.toInternal(($eTerm) $term)"
  }
}
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16704) Document has some error for connect.md

2020-03-21 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-16704:
-

 Summary: Document has  some error for connect.md
 Key: FLINK-16704
 URL: https://issues.apache.org/jira/browse/FLINK-16704
 Project: Flink
  Issue Type: Wish
  Components: Documentation
Affects Versions: 1.10.0
Reporter: hehuiyuan
 Attachments: image-2020-03-21-15-09-59-802.png, 
image-2020-03-21-15-11-00-061.png

For branch < = 1.10 , that has some errors.

*Type.ROW ->Types.ROW*

!image-2020-03-21-15-09-59-802.png|width=265,height=296!

 

 

!image-2020-03-21-15-11-00-061.png|width=334,height=193!

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16584) Whether to support the long type field in table planner when the source is kafka and event time field's type is long

2020-03-13 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-16584:
-

 Summary: Whether to support  the long type field in table planner 
when the source is kafka and event time field's type is long
 Key: FLINK-16584
 URL: https://issues.apache.org/jira/browse/FLINK-16584
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / API
Reporter: hehuiyuan


For rowtime function , the field type may be long or timestamp .

But the event time field type is only timestamp when use kafka connect.

Some validations (for  example ,when create kafka table source   )  are not 
allowed long.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16465) Add more detailed instructions for TableSink

2020-03-06 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-16465:
-

 Summary: Add more detailed instructions for TableSink
 Key: FLINK-16465
 URL: https://issues.apache.org/jira/browse/FLINK-16465
 Project: Flink
  Issue Type: Wish
  Components: Documentation
Reporter: hehuiyuan


That has a error when I define a UpserStreamTableSink without getTableSchem 
method or getFieldsNames/getFields methods.

Some instructions are added that may be better.

[https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#upsertstreamtablesink]
{code:java}

Exception in thread "main" org.apache.flink.table.api.TableException: Table 
sink does not implement a table schema.
at org.apache.flink.table.sinks.TableSink.getTableSchema(TableSink.java:75)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSink(TableEnvironmentImpl.java:215)
at 
com.jd.flink.sql.writer.console.ConsoleUpsertWriter.initWriter(ConsoleUpsertWriter.java:40)
at com.jd.flink.sql.writer.Writer.registWriter(Writer.java:33)
at com.jd.flink.sql.test.MainClassTest_hhy.main(MainClassTest_hhy.java:222)
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16463) CodeGenUtils generates code that has two semicolons for GroupingWindowAggsHandler in blink

2020-03-06 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-16463:
-

 Summary: CodeGenUtils generates code that has two semicolons for 
GroupingWindowAggsHandler in blink 
 Key: FLINK-16463
 URL: https://issues.apache.org/jira/browse/FLINK-16463
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / Planner
Reporter: hehuiyuan
 Attachments: image-2020-03-06-20-43-20-300.png, 
image-2020-03-06-20-44-16-446.png

!image-2020-03-06-20-43-20-300.png|width=452,height=297!

 

!image-2020-03-06-20-44-16-446.png|width=513,height=282!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15610) How to achieve the udf that the number of return column is uncertain

2020-01-15 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15610:
-

 Summary: How to achieve the udf that the number of return column 
is uncertain 
 Key: FLINK-15610
 URL: https://issues.apache.org/jira/browse/FLINK-15610
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / API
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15596) Support key-value messages for kafka producer for flink SQL \Tbale

2020-01-14 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15596:
-

 Summary:  Support  key-value messages for kafka producer for flink 
SQL \Tbale
 Key: FLINK-15596
 URL: https://issues.apache.org/jira/browse/FLINK-15596
 Project: Flink
  Issue Type: Wish
  Components: Connectors / Kafka
Reporter: hehuiyuan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15404) How to insert hive table for different catalog

2019-12-26 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15404:
-

 Summary: How to insert hive table for  different catalog
 Key: FLINK-15404
 URL: https://issues.apache.org/jira/browse/FLINK-15404
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / Planner
Reporter: hehuiyuan


I have a hive catalog :

 
{code:java}
    catalog name : myhive 
    database : default
{code}
 

and  the flink has a default catalog :     

 
{code:java}
    catalog name : default_catalog
    database : default_database
{code}
 

For example :

I have a source table 'source_table' that's from kafka   which is register to  
default_catalog,

I want to insert hive table 'hive_table' that is from myhive catalog.

SQL:

insert into hive_table select * from source_table;

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15326) Add document description for DECIMAL(38, 18) when the sink table uses json schema

2019-12-19 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15326:
-

 Summary: Add document description for DECIMAL(38, 18) when the 
sink table uses json schema 
 Key: FLINK-15326
 URL: https://issues.apache.org/jira/browse/FLINK-15326
 Project: Flink
  Issue Type: Wish
Reporter: hehuiyuan


Env : 

        Flink 1.9.1 

        table-planner-blink

 

Question:

If i have a kafka sink table with json schema:

 
{code:java}
String jsonSchema = 
"{
    type:'object',
    properties:{
        name: { type: 'string' },
        age: { type: 'integer' },
        sex: { type: 'string' }
    }
}";
JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(jsonSchema.toString());
TypeInformation fieldTypes = deserializationSchema.getProducedType();
Kafka kafka = new Kafka...
Schema schema = new Schema..
tableEnvironment.connect(kafka)
 .withFormat( new Json().jsonSchema(jsonSchema))
 .withSchema( schema )
 .inAppendMode()
 .registerTableSink("sink_table")
;
String sinksql = "insert into sink_example2 select * from table2"
tableEnvironment.sqlUpdate(sinksql);
{code}
 

 

Error:
{code:java}

Query result schema: [name: String, age: BigDecimal, sex: String]
TableSink schema:[name: String, age: BigDecimal, sex: String]

{code}
The table `table2` : table schema 

 
{code:java}
[2019-12-19 18:10:16,937] INFO t2: root
 |-- name: STRING
 |-- age: DECIMAL(10, 0)
 |-- sex: STRING
 
{code}
 

When i use kafka to read data for json schema , i understand the integer type 
in json is mapped  DECIMAL(38, 18) in flink table

 
{code:java}
|-- name: STRING
 |-- age: DECIMAL(38, 18)
 |-- sex: STRING
{code}
 

That's why i  know to set decimal precision
{code:java}

String sinksql = "insert into sink_example2 select name CAST(age as 
decimal(38,18) ) as age, sex from table2"
{code}
JSON format needs to pay attention to this problem.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15159) the string of json is mapped to VARCHAR or STRING?

2019-12-09 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15159:
-

 Summary: the string of json is mapped to VARCHAR or STRING?
 Key: FLINK-15159
 URL: https://issues.apache.org/jira/browse/FLINK-15159
 Project: Flink
  Issue Type: Wish
  Components: Documentation
Reporter: hehuiyuan
 Attachments: image-2019-12-09-21-14-08-183.png

!image-2019-12-09-21-14-08-183.png|width=356,height=180!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15158) Why convert integer to bigdecimal for formart-json when kafka is used

2019-12-09 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15158:
-

 Summary: Why convert integer to bigdecimal for formart-json when 
kafka is used
 Key: FLINK-15158
 URL: https://issues.apache.org/jira/browse/FLINK-15158
 Project: Flink
  Issue Type: Wish
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: hehuiyuan


For example , 

I have a table  `table1` :

root
 |-- name: STRING
 |-- age: INT
 |-- sex: STRING

 

then , I want to `insert into kafka select * form table1` :

jsonschame: 

{type:'object',properties:\{name: { type: 'string' },age: \{ type: 'integer' 
},sex: \{ type: 'string' }}}

 

```

descriptor.withFormat(new Json().jsonSchema(jsonSchema)).withSchema(schema);

```

 

Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Field types of query result and registered TableSink [sink_example2] do not 
match.Exception in thread "main" 
org.apache.flink.table.api.ValidationException: Field types of query result and 
registered TableSink [sink_example2] do not match.

*Query result schema: [name: String, age: Integer, sex: String]*

*TableSink schema:    [name: String, age: BigDecimal, sex: String]* at 
org.apache.flink.table.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:65)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:156)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:155)
 at scala.Option.map(Option.scala:146) 

 

I know that the type of integer in the jsonschema is convert to BigDecimal .But 
for the above scenario, does this have to be forced to be decimal?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15146) The value of `cleanupSize` should be grater than 0 for `IncrementalCleanupStrategy`

2019-12-09 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-15146:
-

 Summary: The value of `cleanupSize` should be grater than 0 for 
`IncrementalCleanupStrategy`
 Key: FLINK-15146
 URL: https://issues.apache.org/jira/browse/FLINK-15146
 Project: Flink
  Issue Type: Wish
Reporter: hehuiyuan
 Attachments: image-2019-12-09-17-03-59-014.png, 
image-2019-12-09-17-09-18-062.png

!image-2019-12-09-17-03-59-014.png|width=615,height=108!

!image-2019-12-09-17-09-18-062.png|width=491,height=309!

Hi , the value of cleanupSize is grater than or equal 0. Whether that the value 
is grater than 0 is more practical.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14639) Fix the document of Metircs that has an error for `User Scope`

2019-11-06 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-14639:
-

 Summary: Fix the document of Metircs  that has an error for `User 
Scope` 
 Key: FLINK-14639
 URL: https://issues.apache.org/jira/browse/FLINK-14639
 Project: Flink
  Issue Type: Wish
  Components: Documentation
 Environment: Hi , i think it should be `MetricGroup{{#addGroup(String 
key, String value)}}`
Reporter: hehuiyuan
 Attachments: image-2019-11-07-10-42-37-862.png

!image-2019-11-07-10-42-37-862.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14550) can't use proctime attribute when register datastream for table and exist nested fields

2019-10-28 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-14550:
-

 Summary: can't use proctime attribute when register datastream for 
table and exist nested fields
 Key: FLINK-14550
 URL: https://issues.apache.org/jira/browse/FLINK-14550
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: hehuiyuan


*_The data schame :_*

 

final String schemaString =
 
"{\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\","
 +
 "\"fields\": 
[\{\"name\":\"name\",\"type\":\"string\"},\{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},"
 +
 
"\{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},\{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}"
 +
 
",\{\"name\":\"type_double_test\",\"type\":\"double\"},\{\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]},"
 +
 
"\{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":"
 +
 
"\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\","
 +
 
"\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\","
 +
 
"\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\","
 +
 
"\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\","
 +
 
"\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"Fixed16\","
 +
 
"\"size\":16}],\"size\":16},\{\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]},"
 +
 
*"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[{\"name\":\"num\","
 +*
 
*"\"type\":\"int\"},\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"},"
 +*
 
*"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},*{\"name\":\"type_bytes\","
 +
 
"\"type\":\"bytes\"},\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
 +
 
"\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\","
 +
 
"\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\","
 +
 
"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\","
 +
 
"\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\","
 +
 
"\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\","
 +
 
"\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";

 

*_The code :_*

tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime");

 

_*The error is as follows:*_

Exception in thread "main" org.apache.flink.table.api.TableException: The 
proctime attribute can only be appended to the table schema and not replace an 
existing field. Please move 'userActionTime' to the end of the schema.Exception 
in thread "main" org.apache.flink.table.api.TableException: The proctime 
attribute can only be appended to the table schema and not replace an existing 
field. Please move 'userActionTime' to the end of the schema. at 
org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649)
 at 
org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676)
 at 
org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:668)
 at 
org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:549)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:136)
 at 
com.jd.flink.sql.demo.validate.schema.avro.AvroQuickStartMain.main(AvroQuickStartMain.java:145)

 

 

The code is ok.

tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street");

 

The code is ok.

tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested,userActionTime.proctime");

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-13810) Update `Elasticsearch Connector` label

2019-08-21 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-13810:
-

 Summary: Update `Elasticsearch Connector` label 
 Key: FLINK-13810
 URL: https://issues.apache.org/jira/browse/FLINK-13810
 Project: Flink
  Issue Type: Wish
  Components: Documentation
Reporter: hehuiyuan


Sink: Streaming Append Mode
Sink: Streaming Upsert Mode
Format: JSON-only

 

The first table should be Source!



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13757) Document error for `logical functions`

2019-08-16 Thread hehuiyuan (JIRA)
hehuiyuan created FLINK-13757:
-

 Summary: Document error for  `logical functions`
 Key: FLINK-13757
 URL: https://issues.apache.org/jira/browse/FLINK-13757
 Project: Flink
  Issue Type: Wish
  Components: Documentation
Reporter: hehuiyuan
 Attachments: image-2019-08-17-11-58-53-247.png

[https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html#logical-functions]

False:
|{{boolean IS NOT TRUE}}|Returns TRUE if _boolean_ is FALSE or UNKNOWN; returns 
FALSE if _boolean_ is FALSE.|

True:
|{{boolean IS NOT TRUE}}|Returns TRUE if _boolean_ is FALSE or UNKNOWN; returns 
FALSE if _boolean_ is TURE.|

[!image-2019-08-17-11-58-53-247.png!|https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html#logical-functions]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13756) Modify Code Annotations for findAndCreateTableSource in TableFactoryUtil

2019-08-16 Thread hehuiyuan (JIRA)
hehuiyuan created FLINK-13756:
-

 Summary:  Modify Code Annotations for findAndCreateTableSource  in 
TableFactoryUtil
 Key: FLINK-13756
 URL: https://issues.apache.org/jira/browse/FLINK-13756
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / API
Reporter: hehuiyuan


 

/**
 * Returns a *table sink* matching the \{@link 
org.apache.flink.table.catalog.CatalogTable}.
 */
public static  TableSource findAndCreateTableSource(CatalogTable table) {
 return findAndCreateTableSource(table.toProperties());
}

 

Hi , this method `findAndCreateTableSource`   is used for returning  
`TableSource` , but the annotation is *` Returns a table sink`*

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13183) Add PrintTableSink for Table & SQL APi

2019-07-09 Thread hehuiyuan (JIRA)
hehuiyuan created FLINK-13183:
-

 Summary: Add PrintTableSink for Table & SQL APi
 Key: FLINK-13183
 URL: https://issues.apache.org/jira/browse/FLINK-13183
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / API
Reporter: hehuiyuan






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13181) Add a constructor function to CsvTableSink

2019-07-09 Thread hehuiyuan (JIRA)
hehuiyuan created FLINK-13181:
-

 Summary: Add a constructor function to CsvTableSink
 Key: FLINK-13181
 URL: https://issues.apache.org/jira/browse/FLINK-13181
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: hehuiyuan


Add a constructor function for parameters :
  @param path   The output path to write the Table to.
  @param fieldDelim The field delimiter
  @param writeMode  The write mode to specify whether existing files 
are overwritten or not.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13180) Add a constructor function to CsvTableSink

2019-07-09 Thread hehuiyuan (JIRA)
hehuiyuan created FLINK-13180:
-

 Summary: Add a constructor function to CsvTableSink
 Key: FLINK-13180
 URL: https://issues.apache.org/jira/browse/FLINK-13180
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: hehuiyuan


Add a constructor function to CsvTableSink.

For parameter :
 @param path   The output path to write the Table to.
 @param fieldDelim The field delimiter
 @param writeMode  The write mode to specify whether existing files are 
overwritten or not.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)