[jira] [Created] (FLINK-28741) Unexpected result if insert 'false' to boolean column

2022-07-29 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-28741:
--

 Summary: Unexpected result if insert 'false' to boolean column
 Key: FLINK-28741
 URL: https://issues.apache.org/jira/browse/FLINK-28741
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: Jing Zhang


Using hive dialect to insert a string 'false' to boolean column, the result is 
true.

The error could be reproduced in the following ITCase.
{code:java}
@Test
public void testUnExpectedResult() throws ExecutionException, 
InterruptedException {
HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion());
CoreModule coreModule = CoreModule.INSTANCE;
for (String loaded : tableEnv.listModules()) {
tableEnv.unloadModule(loaded);
}
tableEnv.loadModule("hive", hiveModule);
tableEnv.loadModule("core", coreModule);
// create source table
tableEnv.executeSql(
"CREATE TABLE test_table (params string) PARTITIONED BY (`p_date` 
string)");
// prepare a data which value is 'false'
tableEnv.executeSql("insert overwrite test_table partition(p_date = 
'20220612') values ('false')")
.await();
// create target table which only contain one boolean column 
tableEnv.executeSql(
"CREATE TABLE target_table (flag boolean) PARTITIONED BY (`p_date` 
string)");
// 
tableEnv.executeSql(
"insert overwrite table target_table partition(p_date = '20220724') 
"
+ "SELECT params FROM test_table WHERE 
p_date='20220612'").await();
TableImpl flinkTable =
(TableImpl) tableEnv.sqlQuery("select flag from target_table where 
p_date = '20220724'");
   List results = 
CollectionUtil.iteratorToList(flinkTable.execute().collect());
assertEquals(
"[false]", results.toString());
} {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28397) Source Supports Speculative Execution For Batch Job

2022-07-05 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-28397:
--

 Summary: Source Supports Speculative Execution For Batch Job
 Key: FLINK-28397
 URL: https://issues.apache.org/jira/browse/FLINK-28397
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.16.0
Reporter: Jing Zhang
Assignee: Jing Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28003) 'pos ' field would be updated to 'POSITION' when use SqlClient

2022-06-10 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-28003:
--

 Summary: 'pos  ' field would be updated to 'POSITION' when use 
SqlClient
 Key: FLINK-28003
 URL: https://issues.apache.org/jira/browse/FLINK-28003
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.15.0
Reporter: Jing Zhang
 Attachments: zj_test.sql

When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql'
{code:java}
create table if not exists db.zj_test(
pos                   int,
rank_cmd              string
)
partitioned by (
`p_date` string,
`p_hourmin` string);

INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = 
'0100')
SELECT
pos ,
rank_cmd
FROM db.sourceT
where p_date = '20220605' and p_hourmin = '0100'; {code}
An error would be thrown out because the 'pos' field is changed to 'POSITION'. 
I guess `SqlCompleter` in sqlClient module must do something here.

The error could be reproduced using the attached file.

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27086) Add a QA about how to handle exception when use hive parser in hive dialect document

2022-04-06 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-27086:
--

 Summary: Add a QA about how to handle exception when use hive 
parser in hive dialect document
 Key: FLINK-27086
 URL: https://issues.apache.org/jira/browse/FLINK-27086
 Project: Flink
  Issue Type: Technical Debt
  Components: Documentation
Affects Versions: 1.15.0
Reporter: Jing Zhang
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27020) use hive dialect in SqlClient would thrown an error based on 1.15 version

2022-04-02 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-27020:
--

 Summary: use hive dialect in SqlClient would thrown an error based 
on 1.15 version
 Key: FLINK-27020
 URL: https://issues.apache.org/jira/browse/FLINK-27020
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.15.0
Reporter: Jing Zhang
 Attachments: image-2022-04-02-20-28-01-335.png

I use 1.15 rc0 and encounter a problem.
An error would be thrown out if I use hive dialect in SqlClient.
 !image-2022-04-02-20-28-01-335.png! 
And I already add flink-sql-connector-hive-2.3.6_2.12-1.15-SNAPSHOT.jar.
I note that, load and use hive module could work fine, but use hive dialect 
would fail.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27019) use hive dialect in SqlClient would thrown an error based on 1.15 version

2022-04-02 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-27019:
--

 Summary: use hive dialect in SqlClient would thrown an error based 
on 1.15 version
 Key: FLINK-27019
 URL: https://issues.apache.org/jira/browse/FLINK-27019
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.15.0
Reporter: Jing Zhang
 Attachments: image-2022-04-02-20-25-25-169.png

I use 1.15 rc0 and encounter a problem.
An error would be thrown out if I use hive dialect in SqlClient.
 !image-2022-04-02-20-25-25-169.png! 

And I already add flink-sql-connector-hive-2.3.6_2.12-1.15-SNAPSHOT.jar.
I note that, load and use hive module could work fine, but use hive dialect 
would fail.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25645) UnsupportedOperationException would thrown out when hash shuffle by a field with array type

2022-01-13 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25645:
--

 Summary: UnsupportedOperationException would thrown out when hash 
shuffle by a field with array type
 Key: FLINK-25645
 URL: https://issues.apache.org/jira/browse/FLINK-25645
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jing Zhang
 Attachments: image-2022-01-13-19-12-40-756.png, 
image-2022-01-13-19-15-28-395.png

Currently array type is not supported as hash shuffle key because CodeGen does 
not support it yet.
 !image-2022-01-13-19-15-28-395.png! 

An unsupportedOperationException would thrown out when hash shuffle by a field 
with array type,
 !image-2022-01-13-19-12-40-756.png! 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25641) Unexpected aggregate plan after load hive module

2022-01-13 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25641:
--

 Summary: Unexpected aggregate plan after load hive module
 Key: FLINK-25641
 URL: https://issues.apache.org/jira/browse/FLINK-25641
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jing Zhang
 Attachments: image-2022-01-13-15-52-27-783.png, 
image-2022-01-13-15-55-40-958.png

When using flink batch sql to run hive sql queries, we load hive module to use 
Hive built-in functions.
However some query plan plan are unexpected after loading hive module.
For the following sql,

{code:sql}
load module hive;
use modules hive,core;
set table.sql-dialect=hive;

select
  account_id,
  sum(impression)
from test_db.test_table where dt = '2022-01-10' and hi = '0100' group by 
account_id
{code}
The planner is:

 !image-2022-01-13-15-55-40-958.png! 

After remove 'load mudiles hive; use modules hive, core;', the planner is:

 !image-2022-01-13-15-52-27-783.png! 

After loading hive modules, hash aggregate is not final plan because the 
aggregate buffer is not fixed length which type is as following.
{code:java}
LEGACY('RAW', 
'ANY')
{code}






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25605) Batch get statistics of multiple partitions instead of get one by one

2022-01-11 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25605:
--

 Summary: Batch get statistics of multiple partitions instead of 
get one by one
 Key: FLINK-25605
 URL: https://issues.apache.org/jira/browse/FLINK-25605
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jing Zhang
 Attachments: image-2022-01-11-15-59-55-894.png, 
image-2022-01-11-16-00-28-002.png

Currently, `PushPartitionIntoTableSourceScanRule` would fetch statistics of 
matched partitions one by one.
 !image-2022-01-11-15-59-55-894.png! 
If there are multiple matched partitions, it costs much time to waiting for get 
all statistics.
We could make an improvement here to batch get statistics of multiple 
partitions.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25604) Remove useless aggregate function

2022-01-10 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25604:
--

 Summary: Remove useless aggregate function 
 Key: FLINK-25604
 URL: https://issues.apache.org/jira/browse/FLINK-25604
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jing Zhang


We expect useless aggregate call could be removed after projection push down.
But sometimes, planner is unexpected. For example,

{code:sql}
SELECT 
  d
FROM (
  SELECT
d,
c,
row_number() OVER (PARTITION BY d ORDER BY e desc)  review_rank
  FROM (
SELECT e, d, max(f) AS c FROM Table5 GROUP BY e, d)
  )
WHERE review_rank = 1
{code}

The plan is 

{code:java}
Calc(select=[d], where=[=(w0$o0, 1:BIGINT)])
+- OverAggregate(partitionBy=[d], orderBy=[e DESC], window#0=[ROW_NUMBER(*) AS 
w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[e, d, c, 
w0$o0])
   +- Sort(orderBy=[d ASC, e DESC])
  +- Exchange(distribution=[hash[d]])
 +- HashAggregate(isMerge=[true], groupBy=[e, d], select=[e, d, 
Final_MAX(max$0) AS c])
+- Exchange(distribution=[hash[e, d]])
   +- LocalHashAggregate(groupBy=[e, d], select=[e, d, 
Partial_MAX(f) AS max$0])
  +- Calc(select=[e, d, f])
 +- BoundedStreamScan(table=[[default_catalog, 
default_database, Table5]], fields=[d, e, f, g, h])
{code}

In the above sql, max(c) could be removed because it is projected out before 
sink.
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25596) Specify hash/sortmerge join in SQL hint

2022-01-10 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25596:
--

 Summary: Specify hash/sortmerge join in SQL hint
 Key: FLINK-25596
 URL: https://issues.apache.org/jira/browse/FLINK-25596
 Project: Flink
  Issue Type: Sub-task
Reporter: Jing Zhang






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25595) Specify hash/sort aggregate strategy in SQL hint

2022-01-10 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25595:
--

 Summary: Specify hash/sort aggregate strategy in SQL hint
 Key: FLINK-25595
 URL: https://issues.apache.org/jira/browse/FLINK-25595
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jing Zhang






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25594) Take parquet metadata into consideration when source is parquet files

2022-01-10 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25594:
--

 Summary: Take parquet metadata into consideration when source is 
parquet files
 Key: FLINK-25594
 URL: https://issues.apache.org/jira/browse/FLINK-25594
 Project: Flink
  Issue Type: Sub-task
Reporter: Jing Zhang






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25593) A redundant scan could be skipped if it is an input of join and the other input is empty after partition prune

2022-01-10 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25593:
--

 Summary: A redundant scan could be skipped if it is an input of 
join and the other input is empty after partition prune
 Key: FLINK-25593
 URL: https://issues.apache.org/jira/browse/FLINK-25593
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jing Zhang


A redundant scan could be skipped if it is an input of join and the other input 
is empty after partition prune.
For example:
ltable has two partitions: pt=0 ad pt=1, rtable has one partition pt1=0.
The schema of ltable is (lkey string, value int).
The schema of rtable is (rkey string, value int).

{code:java}
SELECT * FROM ltable, rtable WHERE pt=2 and pt1=0 and `lkey`=rkey
{code}

The plan is as following.

{code:java}
Calc(select=[lkey, value, CAST(2 AS INTEGER) AS pt, rkey, value1, CAST(0 AS 
INTEGER) AS pt1])
+- HashJoin(joinType=[InnerJoin], where=[=(lkey, rkey)], select=[lkey, value, 
rkey, value1], build=[right])
   :- Exchange(distribution=[hash[lkey]])
   :  +- TableSourceScan(table=[[hive, source_db, ltable, partitions=[], 
project=[lkey, value]]], fields=[lkey, value])
   +- Exchange(distribution=[hash[rkey]])
  +- TableSourceScan(table=[[hive, source_db, rtable, partitions=[{pt1=0}], 
project=[rkey, value1]]], fields=[rkey, value1])
{code}

There is no need to scan right side because the left input of join has 0 
partitions after partition prune.




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25592) Improvement of parser, optimizer and execution for Flink Batch SQL

2022-01-10 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25592:
--

 Summary: Improvement of parser, optimizer and execution for Flink 
Batch SQL
 Key: FLINK-25592
 URL: https://issues.apache.org/jira/browse/FLINK-25592
 Project: Flink
  Issue Type: Improvement
Reporter: Jing Zhang


This is a parent JIRA to track all improvements on Flink Batch SQL, including 
parser, optimizer and execution.
For example,
1. using Hive dialect and default dialect, some sql query would be translated 
into different plans
2. specify hash/sort aggregate strategy and hash/sortmerge join strategy in sql 
hint
3. take parquet metadata into consideration in optimization
4. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25258) Update log4j2 version to avoid

2021-12-10 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-25258:
--

 Summary: Update log4j2 version to avoid 
 Key: FLINK-25258
 URL: https://issues.apache.org/jira/browse/FLINK-25258
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.0, 1.13.0, 1.12.0, 1.11.0
Reporter: Jing Zhang
 Fix For: 1.15.0


2.0 <= Apache log4j2 <= 2.14.1 have a RCE zero day.

https://www.cyberkendra.com/2021/12/worst-log4j-rce-zeroday-dropped-on.html

https://www.lunasec.io/docs/blog/log4j-zero-day/

Flink has switched to Log4j 2 since 1.11 version.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24772) Update user document for individual window table-valued function

2021-11-04 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24772:
--

 Summary: Update user document for individual window table-valued 
function
 Key: FLINK-24772
 URL: https://issues.apache.org/jira/browse/FLINK-24772
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.15.0
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24760) Update user document for batch window tvf

2021-11-04 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24760:
--

 Summary: Update user document for batch window tvf
 Key: FLINK-24760
 URL: https://issues.apache.org/jira/browse/FLINK-24760
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.15.0
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24708) `ConvertToNotInOrInRule` has a bug which leads to wrong result

2021-10-29 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24708:
--

 Summary: `ConvertToNotInOrInRule` has a bug which leads to wrong 
result
 Key: FLINK-24708
 URL: https://issues.apache.org/jira/browse/FLINK-24708
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: JING ZHANG


A user report this bug in maillist, I paste the content here.

We are in the process of upgrading from Flink 1.9.3 to 1.13.3.  We have noticed 
that statements with either where UPPER(field) or LOWER(field) in combination 
with an IN do not always evaluate correctly. 

 

The following test case highlights this problem.

 


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TestCase {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment._getExecutionEnvironment_();
    env.setParallelism(1);

TestData testData = new TestData();
testData.setField1("bcd");
DataStream stream = env.fromElements(testData);
stream.print();  // To prevent 'No operators' error

final StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment._create_(env);
tableEnvironment.createTemporaryView("testTable", stream, 
Schema._newBuilder_().build());

// Fails because abcd is larger than abc
tableEnvironment.executeSql("select *, '1' as run from testTable WHERE 
lower(field1) IN ('abcd', 'abc', 'bcd', 'cde')").print();
// Succeeds because lower was removed
tableEnvironment.executeSql("select *, '2' as run from testTable WHERE 
field1 IN ('abcd', 'abc', 'bcd', 'cde')").print();
// These 4 succeed because the smallest literal is before abcd
tableEnvironment.executeSql("select *, '3' as run from testTable WHERE 
lower(field1) IN ('abc', 'abcd', 'bcd', 'cde')").print();
tableEnvironment.executeSql("select *, '4' as run from testTable WHERE 
lower(field1) IN ('abc', 'bcd', 'abhi', 'cde')").print();
tableEnvironment.executeSql("select *, '5' as run from testTable WHERE 
lower(field1) IN ('cde', 'abcd', 'abc', 'bcd')").print();
tableEnvironment.executeSql("select *, '6' as run from testTable WHERE 
lower(field1) IN ('cde', 'abc', 'abcd', 'bcd')").print();
// Fails because smallest is not first
tableEnvironment.executeSql("select *, '7' as run from testTable WHERE 
lower(field1) IN ('cdef', 'abce', 'abcd', 'ab', 'bcd')").print();
// Succeeds
tableEnvironment.executeSql("select *, '8' as run from testTable WHERE 
lower(field1) IN ('ab', 'cdef', 'abce', 'abcdefgh', 'bcd')").print();

env.execute("TestCase");
}

public static class TestData {
private String field1;

    public String getField1() {
return field1;
}

public void setField1(String field1) {
this.field1 = field1;
    }
    }
}

 

The job produces the following output:

Empty set

++++

| op | field1 |    run |

++++

| +I |    bcd |  2 |

++++

1 row in set

++++

| op | field1 |    run |

++++

| +I |    bcd |  3 |

++++

1 row in set

++++

| op | field1 |    run |

++++

| +I |    bcd |  4 |

++++

1 row in set

++++

| op | field1 |    run |

++++

| +I |    bcd |  5 |

++++

1 row in set

++++

| op | field1 |    run |


[jira] [Created] (FLINK-24656) Add user document for Window Deduplication

2021-10-26 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24656:
--

 Summary: Add user document for Window Deduplication
 Key: FLINK-24656
 URL: https://issues.apache.org/jira/browse/FLINK-24656
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.15.0
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24501) Unexpected behavior of cumulate window aggregate for late event after recover from sp/cp

2021-10-11 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24501:
--

 Summary: Unexpected behavior of cumulate window aggregate for late 
event after recover from sp/cp
 Key: FLINK-24501
 URL: https://issues.apache.org/jira/browse/FLINK-24501
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: JING ZHANG


*Problem description*

After recover from savepoint or checkpoint, unexpected behavior of cumulate 
window aggregate for late event may happened.

*Bug analyze*

Currently, for cumulate window aggregate, late events belongs to the cleaned 
slice would be merged into the merged window state, and would be counted into 
the later slice.

For example, for a CUMULATE window, step is 1 minute, size is 1 day.
{code:java}
SELECT window_start, window_end, COUNT(USER_ID)
  FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL '1' 
DAY))
  GROUP BY window_start, window_end;{code}
When the watermark already comes to 11:01, result of window [00:00, 11:01) 
would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)

Then if a late record which event time is 11:00 comes, it would be merged into 
merged state, and would be counted into the later slice, for example, for 
window [00:00, 11:02), [00:00, 11:03)... But the emitted window result INSERT 
(00:00, 11:01, 4) would not be retracted and updated.

The behavior would be different if the job recover from savepoint/checkpoint. 
Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01, 4).

Then recover the job from savepoint. Watermarks are not checkpointed and they 
need to be repopulated again. So after recovered, the watermark may rollback to 
11:00, then if a record which event time is 11:00 comes, it would not be 
processed as late event, after watermark comes to 11:01 again, a window result 
INSERT (00:00, 11:01, 5)  would be emitted to downstream.

So the downstream operator would receive two INSERT record for WINDOW (00:00, 
11:01) which may leads to wrong result.

 

*Solution*

There are two solutions for the problem:
 # save watermark to state in slice shared operator. (Prefered)
 # update the behavior for late event. For example, retract the emitted result 
and send the updated result. It needs to change the behavior of slice state 
clean mechanism because we clean the slice state after watermark exceeds the 
slice end currently.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24168) Rowtime type is not correct for windowTableFunction or OverAggregate on Match because the output type does not updated after input rowtime attribute changed from rowtime

2021-09-06 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24168:
--

 Summary: Rowtime type is not correct for windowTableFunction or 
OverAggregate on Match because the output type does not updated after input 
rowtime attribute changed from rowtime to rowtime_ltz
 Key: FLINK-24168
 URL: https://issues.apache.org/jira/browse/FLINK-24168
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.14.0
Reporter: JING ZHANG


Rowtime type is not correct for windowTableFunction or OverAggregate on Match 
because the output type does not updated after input rowtime attribute changed 
from rowtime to rowtime_ltz in `RelTimeIndicator`.

The bug could be reproduced by the following two cases:
{code:java}
@Test
def testWindowTVFOnMatchRecognizeOnRowtimeLTZ(): Unit = {
  val sqlQuery =
s"""
   |SELECT
   |  *
   |FROM Ticker
   |MATCH_RECOGNIZE (
   |  PARTITION BY symbol
   |  ORDER BY ts_ltz
   |  MEASURES
   |A.price as price,
   |A.tax as tax,
   |MATCH_ROWTIME() as matchRowtime
   |  ONE ROW PER MATCH
   |  PATTERN (A)
   |  DEFINE
   |A AS A.price > 0
   |) AS T
   |""".stripMargin
  val table = util.tableEnv.sqlQuery(sqlQuery)
  util.tableEnv.registerTable("T", table)
  val sqlQuery1 =
s"""
   |SELECT *
   |FROM TABLE(TUMBLE(TABLE T, DESCRIPTOR(matchRowtime), INTERVAL '3' 
second))
   |""".stripMargin
  util.verifyRelPlanWithType(sqlQuery1)
}

@Test
def testOverWindowOnMatchRecognizeOnRowtimeLTZ(): Unit = {
  val sqlQuery =
s"""
   |SELECT
   |  *
   |FROM Ticker
   |MATCH_RECOGNIZE (
   |  PARTITION BY symbol
   |  ORDER BY ts_ltz
   |  MEASURES
   |A.price as price,
   |A.tax as tax,
   |MATCH_ROWTIME() as matchRowtime
   |  ONE ROW PER MATCH
   |  PATTERN (A)
   |  DEFINE
   |A AS A.price > 0
   |) AS T
   |""".stripMargin
  val table = util.tableEnv.sqlQuery(sqlQuery)
  util.tableEnv.registerTable("T", table)
  val sqlQuery1 =
"""
  |SELECT
  |  symbol,
  |  price,
  |  tax,
  |  matchRowtime,
  |  SUM(price) OVER (
  |PARTITION BY symbol ORDER BY matchRowtime RANGE UNBOUNDED PRECEDING) 
as price_sum
  |FROM T
""".stripMargin
  util.verifyRelPlanWithType(sqlQuery1)
}

{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24024) Fix syntax mistake in session Window TVF

2021-08-27 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24024:
--

 Summary: Fix syntax mistake in session Window TVF 
 Key: FLINK-24024
 URL: https://issues.apache.org/jira/browse/FLINK-24024
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.0
Reporter: JING ZHANG


There is a syntax mistake in session Window TVF in FLINK-23543.

For example, the following SQL has syntax mistake.

 
{code:java}
"""
  |SELECT
  |   a,
  |   window_start,
  |   window_end,
  |   count(*),
  |   sum(d),
  |   max(d) filter (where b > 1000),
  |   count(distinct c) AS uv
  |FROM TABLE(
  |  SESSION(
  |TABLE MyTable, 
  |DESCRIPTOR(proctime), 
  |INTERVAL '5' MINUTE))
  |GROUP BY a, window_start, window_end
""".stripMargin
{code}
 

It should updated to the following SQL, while partition key (a) should be moved 
into SESSION

Window TVF based on Calcite [SESSION window 
TVF|https://calcite.apache.org/docs/reference.html#session].

 
{code:java}
val sql =
  """
|SELECT
|   a,
|   window_start,
|   window_end,
|   count(*),
|   sum(d),
|   max(d) filter (where b > 1000),
|   count(distinct c) AS uv
|FROM TABLE(
|  SESSION(
|TABLE MyTable,
|DESCRIPTOR(proctime),
|DESCRIPTOR(a)
|INTERVAL '5' MINUTE))
|GROUP BY a, window_start, window_end
  """.stripMargin{code}
 

To fix the bug, we only need update Session Window TVF syntax, we don't need 
update the operator part.

Besides, we should check group keys of window aggregate should only contain 
window_start, window_end, partition_key. group keys could not contain other 
fields. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24002) Support count window with the window TVF in planner

2021-08-26 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24002:
--

 Summary: Support count window with the window TVF in planner
 Key: FLINK-24002
 URL: https://issues.apache.org/jira/browse/FLINK-24002
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: JING ZHANG


For a long time, count window is supported in Table API, but not supported in 
SQL.

With the new window TVF syntax, we can also introduce a new window function for 
count window.

For example, the following TUMBLE_ROW assigns windows in 10 row-count interval. 
{panel}
{panel}
|{{SELECT}} {{*}}
{{FROM}} {{TABLE}}{{(}}
{{   }}{{TUMBLE_ROW(}}
{{ }}{{data => }}{{TABLE}} {{inputTable,}}
{{ }}{{timecol => DESCRIPTOR(timecol),}}
{{ }}{{size}} {{=> 10));}}|

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24001) Support evaluating individual window table-valued function in runtime

2021-08-26 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24001:
--

 Summary: Support evaluating individual window table-valued 
function in runtime
 Key: FLINK-24001
 URL: https://issues.apache.org/jira/browse/FLINK-24001
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Affects Versions: 1.15.0
Reporter: JING ZHANG


{{Currently, window table-valued function has to be used with other window 
operation, such as window aggregate, window topN and window join. }}

{{In the ticket, we aim to support evaluating individual window table-valued 
function in runtime, which means, introduce an operator to handle this.}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24000) window aggregate support allow lateness

2021-08-26 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-24000:
--

 Summary: window aggregate support allow lateness
 Key: FLINK-24000
 URL: https://issues.apache.org/jira/browse/FLINK-24000
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: JING ZHANG


Currently, [Window 
aggregate|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg]
 does not support allow-lateness like [Group Window 
Aggregate|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/#group-window-aggregation/]

We aims to support allow-lateness in this ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23999) Support evaluating individual window table-valued function

2021-08-26 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23999:
--

 Summary: Support evaluating individual window table-valued function
 Key: FLINK-23999
 URL: https://issues.apache.org/jira/browse/FLINK-23999
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG
 Fix For: 1.15.0


{{Currently, window table-valued function has to be used with other window 
operation, such as window aggregate, window topN and window join. }}

{{In the ticket, we aim to support evaluating individual window table-valued 
function.}}

{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23997) SQL windowing table-valued function improvement

2021-08-26 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23997:
--

 Summary: SQL windowing table-valued function improvement
 Key: FLINK-23997
 URL: https://issues.apache.org/jira/browse/FLINK-23997
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: JING ZHANG


This is an umbrella issue for follow up issues related with windowing 
table-valued function.

FLIP-145: 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-CumulatingWindows]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23886) An exception is thrown out when recover job timers from savepoint file

2021-08-19 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23886:
--

 Summary: An exception is thrown out when recover job timers from 
savepoint file
 Key: FLINK-23886
 URL: https://issues.apache.org/jira/browse/FLINK-23886
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: JING ZHANG


Setup Specifics:
Version: 1.6.2
RocksDB Map State
Timers stored in rocksdb
 
When we have this job running for long periods of time like > 30 days, if for 
some reason the job restarts, we encounter "Error while deserializing the 
element". Is this a known issue fixed in later versions? I see some changes to 
code for FLINK-10175, but we don't use any queryable state 
 
Below is the stack trace
 
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
element.

at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)

at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)

at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)

at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)

at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)

at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)

at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)

at 
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)

at 
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)

at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)

at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)

at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)

at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)

at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)

at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)

at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.EOFException

at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)

at org.apache.flink.types.StringValue.readString(StringValue.java:769)

at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)

at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)

at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)

at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)

at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)

at 
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)

at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)

... 20 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23751) Testing Window Top-N after Windowing TVF

2021-08-12 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23751:
--

 Summary: Testing Window Top-N after Windowing TVF
 Key: FLINK-23751
 URL: https://issues.apache.org/jira/browse/FLINK-23751
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: JING ZHANG
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23750) Add document for Window Top-N after Windowing TVF

2021-08-12 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23750:
--

 Summary: Add document for Window Top-N after Windowing TVF 
 Key: FLINK-23750
 URL: https://issues.apache.org/jira/browse/FLINK-23750
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: JING ZHANG
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23749) Testing Window Join

2021-08-12 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23749:
--

 Summary: Testing Window Join
 Key: FLINK-23749
 URL: https://issues.apache.org/jira/browse/FLINK-23749
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: JING ZHANG
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23748) Testing session window

2021-08-12 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23748:
--

 Summary: Testing session window
 Key: FLINK-23748
 URL: https://issues.apache.org/jira/browse/FLINK-23748
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: JING ZHANG
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23747) Teting Window TVF offset

2021-08-12 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23747:
--

 Summary: Teting Window TVF offset
 Key: FLINK-23747
 URL: https://issues.apache.org/jira/browse/FLINK-23747
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: JING ZHANG
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23687) Add Sql query hint to enable LookupJoin shuffle by join key of left input

2021-08-09 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23687:
--

 Summary: Add Sql query hint to enable LookupJoin shuffle by join 
key of left input
 Key: FLINK-23687
 URL: https://issues.apache.org/jira/browse/FLINK-23687
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: JING ZHANG


Add Sql query hint to enable LookupJoin shuffle by join key of left input



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23650) A sql contains 'Case when' could not run successfully when choose Hive Dialect

2021-08-05 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23650:
--

 Summary: A sql contains 'Case when' could not run successfully 
when choose Hive Dialect
 Key: FLINK-23650
 URL: https://issues.apache.org/jira/browse/FLINK-23650
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: JING ZHANG


{code:java}
tableEnv.sqlQuery(
"select x,CASE WHEN x > 1 THEN 'aaa' WHEN x >1 AND x < 3 THEN 'bbb' 
ELSE 'ccc' END as y from src"){code}
If use Default dialect, the above code could run successfully. However if use 
Hive dialect, following exception would be thrown out.
{code:java}
org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
when(BOOLEAN, STRING NOT NULL, BOOLEAN, STRING NOT NULL, STRING NOT NULL) 
org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
when(BOOLEAN, STRING NOT NULL, BOOLEAN, STRING NOT NULL, STRING NOT NULL) If 
you think this function should be supported, you can create an issue and start 
a discussion for it.
 at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$7$$anonfun$apply$2.apply(ExprCodeGenerator.scala:837)
 at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$7$$anonfun$apply$2.apply(ExprCodeGenerator.scala:837)
 at scala.Option.getOrElse(Option.scala:121) at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$7.apply(ExprCodeGenerator.scala:836)
 at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$7.apply(ExprCodeGenerator.scala:841)
 at scala.Option.getOrElse(Option.scala:121) at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:829)
 at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
 at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:157)
 at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$2.apply(CalcCodeGenerator.scala:137)
 at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$2.apply(CalcCodeGenerator.scala:137)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:137)
 at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:162)
 at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:48)
 at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
 at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
 at 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:58)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
 at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$1.apply(BatchPlanner.scala:81)
 at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$1.apply(BatchPlanner.scala:80)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:80)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1702)
 at 

[jira] [Created] (FLINK-23638) Field names are not kept after apply ProjectToWindowRule#ProjectToLogicalProjectAndWindowRule

2021-08-05 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23638:
--

 Summary: Field names are not kept after apply 
ProjectToWindowRule#ProjectToLogicalProjectAndWindowRule
 Key: FLINK-23638
 URL: https://issues.apache.org/jira/browse/FLINK-23638
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: JING ZHANG


After apply ProjectToWindowRule#ProjectToLogicalProjectAndWindowRule, field 
names are not kept.

 
{code:java}
//代码占位符
SELECT *
FROM (
  SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
  FROM MyTable
)
WHERE rowNum = 1
{code}
Before apply ProjectToWindowRule#ProjectToLogicalProjectAndWindowRule, field 
names of is input node are (a,b,c,proctime,rowtime,rowNum). After apply the 
rule, field names of equivalent renode are (a,b,c,protime,rowtime,$5).

 

Currently, calcite does not guarantee to preserve field names when planner 
rules are applied (please see CALCITE-2718). But kept field names is important 
for Flink because the field names are visible to front end user of Flink SQL 
job.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23582) Add document for session window

2021-08-02 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23582:
--

 Summary: Add document for session window
 Key: FLINK-23582
 URL: https://issues.apache.org/jira/browse/FLINK-23582
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23565) Window TVF Supports session window in runtime

2021-07-30 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23565:
--

 Summary: Window TVF Supports session window in runtime
 Key: FLINK-23565
 URL: https://issues.apache.org/jira/browse/FLINK-23565
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23554) SqlCli throws an exception and hang

2021-07-30 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23554:
--

 Summary: SqlCli throws an exception and hang 
 Key: FLINK-23554
 URL: https://issues.apache.org/jira/browse/FLINK-23554
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.12.4, 1.13.1, 1.11.3
Reporter: JING ZHANG
 Attachments: image-2021-07-30-14-12-07-817.png

SqlCli would throws an exception like the following, and SqlCli would hang 
forever until kill the process outside.

You could reproduce the exception by the following step:
 # submit a SQL command in SQLCli
 # does not wait for it response, input another SQL command in SQL Cli
 # an exception would be thrown out.

!image-2021-07-30-14-12-07-817.png|width=1706,height=527!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23544) Window TVF Supports session window

2021-07-29 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23544:
--

 Summary: Window TVF Supports session window
 Key: FLINK-23544
 URL: https://issues.apache.org/jira/browse/FLINK-23544
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG


Window TVF supports session window



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23543) Window TVF Supports session window

2021-07-29 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23543:
--

 Summary: Window TVF Supports session window
 Key: FLINK-23543
 URL: https://issues.apache.org/jira/browse/FLINK-23543
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.14.0
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23414) Split Match_ROWTIME return type converter into a separator class

2021-07-16 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23414:
--

 Summary: Split Match_ROWTIME return type converter into a 
separator class
 Key: FLINK-23414
 URL: https://issues.apache.org/jira/browse/FLINK-23414
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23413) Port RelTimeIndicatorConverter from SCALA to JAVA

2021-07-16 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23413:
--

 Summary: Port RelTimeIndicatorConverter from SCALA to JAVA
 Key: FLINK-23413
 URL: https://issues.apache.org/jira/browse/FLINK-23413
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23287) Create user document for Window Join in SQL

2021-07-06 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23287:
--

 Summary: Create user document for Window Join in SQL
 Key: FLINK-23287
 URL: https://issues.apache.org/jira/browse/FLINK-23287
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: JING ZHANG


Create user document for Window Join in SQL



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23286) Create user document for Window Join in SQL

2021-07-06 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23286:
--

 Summary: Create user document for Window Join in SQL
 Key: FLINK-23286
 URL: https://issues.apache.org/jira/browse/FLINK-23286
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: JING ZHANG


Create user document for Window Join in SQL



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23282) Compile error in `Streaming Java` module: MailboxProcessor.java:[217,13]

2021-07-06 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23282:
--

 Summary: Compile error in `Streaming Java` module: 
MailboxProcessor.java:[217,13]
 Key: FLINK-23282
 URL: https://issues.apache.org/jira/browse/FLINK-23282
 Project: Flink
  Issue Type: Bug
Reporter: JING ZHANG


Compile error in master branch caused by class MailboxProcessor.java[217,13],

the detailed information is as following:
{code:java}
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 03:47 min
[INFO] Finished at: 2021-07-06T22:21:27+08:00
[INFO] 
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) 
on project flink-streaming-java_2.11: Compilation failure
[ERROR] 
/Users/zhangjing/IdeaProjects/flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java:[217,13]
 找不到符号
[ERROR] 符号: 方法 ensureControlFlowSignalCheck()
[ERROR] 位置: 类 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
[ERROR]
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn  -rf :flink-streaming-java_2.11
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23247) Materialize rowtime attribute fields of regular join's inputs

2021-07-05 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23247:
--

 Summary: Materialize rowtime attribute fields of regular join's 
inputs
 Key: FLINK-23247
 URL: https://issues.apache.org/jira/browse/FLINK-23247
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG


Materialize rowtime attribute fields of regular join's inputs in 
`RelTimeIndicatorConverter`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23246) Refactor the time indicator materialization

2021-07-05 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23246:
--

 Summary: Refactor the time indicator materialization
 Key: FLINK-23246
 URL: https://issues.apache.org/jira/browse/FLINK-23246
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG


Refactor the time indicator materialization, including

1. Move `time_indicator` materialize after `logical_rewrite` phase, which is 
closely before the physical optimization 

2. Port `RelTimeIndicatorConverter` from scala to Java

3. Refator `RelTimeIndicatorConverter` to match `FlinkLogicalRel`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23245) Materialize rowtime attribute fields of regular join's inputs

2021-07-05 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23245:
--

 Summary: Materialize rowtime attribute fields of regular join's 
inputs
 Key: FLINK-23245
 URL: https://issues.apache.org/jira/browse/FLINK-23245
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: JING ZHANG


Materialize rowtime attribute fields of regular join's inputs in 
`RelTimeIndicatorConverter`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23244) Refactor the time indicator materialization

2021-07-05 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23244:
--

 Summary: Refactor the time indicator materialization
 Key: FLINK-23244
 URL: https://issues.apache.org/jira/browse/FLINK-23244
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: JING ZHANG


Refactor the time indicator materialization, including

1. Move `time_indicator` materialize after `logical_rewrite` phase, which is 
closely before the physical optimization 

2. Port `RelTimeIndicatorConverter` from scala to Java

3. Refator `RelTimeIndicatorConverter` to match `FlinkLogicalRel`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23079) HiveTableSinkITCase fails

2021-06-22 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23079:
--

 Summary: HiveTableSinkITCase fails
 Key: FLINK-23079
 URL: https://issues.apache.org/jira/browse/FLINK-23079
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: JING ZHANG


There are 4 tests in HiveTableSinkITCase fails:

testBatchAppend,

testPartStreamingMrWrite,

testHiveTableSinkWithParallelismInStreaming,

testStreamingSinkWithTimestampLtzWatermark

 

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19241=logs=e25d5e7e-2a9c-5589-4940-0b638d75a414=a6e0f756-5bb9-5ea8-a468-5f60db442a29



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23055) Add document for Window TVF offset

2021-06-21 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23055:
--

 Summary: Add document for Window TVF offset
 Key: FLINK-23055
 URL: https://issues.apache.org/jira/browse/FLINK-23055
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.14.0
Reporter: JING ZHANG
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23023) Support offset in window TVF

2021-06-17 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23023:
--

 Summary: Support offset in window TVF
 Key: FLINK-23023
 URL: https://issues.apache.org/jira/browse/FLINK-23023
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23015) Implement streaming window Deduplicate operator

2021-06-16 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23015:
--

 Summary: Implement streaming window Deduplicate operator
 Key: FLINK-23015
 URL: https://issues.apache.org/jira/browse/FLINK-23015
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23014) Support streaming window Deduplicate in planner

2021-06-16 Thread JING ZHANG (Jira)
JING ZHANG created FLINK-23014:
--

 Summary: Support streaming window Deduplicate in planner
 Key: FLINK-23014
 URL: https://issues.apache.org/jira/browse/FLINK-23014
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: JING ZHANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15077) Support Semi/Anti LookupJoin in Blink planner

2019-12-05 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-15077:
--

 Summary: Support Semi/Anti LookupJoin in Blink planner
 Key: FLINK-15077
 URL: https://issues.apache.org/jira/browse/FLINK-15077
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jing Zhang


Support the following sql in Blink planner:

{code:sql}
SELECT T.id, T.len, T.content FROM T WHERE T.id IN (
  SELECT id FROM csvDim FOR SYSTEM_TIME AS OF PROCTIME() AS D)
{code}

{code:sql}
SELECT T.id, T.len, T.content FROM T WHERE EXISTS (
  SELECT * FROM csvDim FOR SYSTEM_TIME AS OF PROCTIME() AS D WHERE T.id = D.id)
{code}

{code:sql}
SELECT T.id, T.len, T.content FROM T WHERE NOT EXISTS (
  SELECT * FROM csvDim FOR SYSTEM_TIME AS OF PROCTIME() AS D WHERE T.id = D.id)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14946) Retraction infer would result in bad plan under corner case in blink planner

2019-11-25 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-14946:
--

 Summary: Retraction infer would result in bad plan under corner 
case in blink planner
 Key: FLINK-14946
 URL: https://issues.apache.org/jira/browse/FLINK-14946
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0
Reporter: Jing Zhang
 Attachments: image-2019-11-26-14-54-34-797.png

Retractions rule would result in bad plan under some case, I simplify the case 
like the following sql, complete test case could be found in attachments.

{code:scala}
  val join_sql =
  """
|SELECT
|  ll.a AS a,
|  ll.b AS b,
|  cnt
|FROM (
| SELECT a, b, COUNT(c) AS cnt FROM l GROUP BY a, b
|) ll
|JOIN (
| SELECT a, b FROM r GROUP BY a, b
|) rr ON
|(ll.a = rr.a AND ll.b = rr.b)
  """.stripMargin !image-2019-11-26-14-52-52-824.png! 

val sqlQuery =
  s"""
 |SELECT a, b_1, SUM(cnt) AS cnt
 |FROM (
 | SELECT *, b AS b_1 FROM (${join_sql})
 |   UNION ALL
 | SELECT *, 'SEA' AS b_1 FROM (${join_sql})
 |) AS total_result
 |GROUP BY a, b_1
  """.stripMargin
{code}

The plan is :
 !image-2019-11-26-14-54-34-797.png! 
After retraction infer, we expect two join node in the above plan has 
`AccRetract` asAccMode. However, AccMode of Join1 is right, accMode of Join2 is 
unexpected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14875) Set Unmodifiable collection serializer of Kryo in Flink code

2019-11-20 Thread Jing Zhang (Jira)
Jing Zhang created FLINK-14875:
--

 Summary: Set Unmodifiable collection serializer of Kryo in Flink 
code
 Key: FLINK-14875
 URL: https://issues.apache.org/jira/browse/FLINK-14875
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Reporter: Jing Zhang
 Attachments: ImmutableCollectionKryoDeserializerITCase.java

If source contains data which has immutable collection, the exception will be 
thrown out:

{code:java}
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
Serialization trace:
logTags_ (com.aliyun.openservices.log.common.Logs$LogGroup)
mLogGroup (com.aliyun.openservices.log.common.LogGroupData)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:138)
at 
org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:47)
at 
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:463)
at 
org.apache.flink.table.dataformat.BinaryRow.getGeneric(BinaryRow.java:440)
at BaseRowSerializerProjection$52.apply(Unknown Source)
at BaseRowSerializerProjection$52.apply(Unknown Source)
at 
org.apache.flink.table.typeutils.BaseRowSerializer.baseRowToBinary(BaseRowSerializer.java:250)
at 
org.apache.flink.table.typeutils.BaseRowSerializer.serializeToPages(BaseRowSerializer.java:285)
at 
org.apache.flink.table.typeutils.BaseRowSerializer.serializeToPages(BaseRowSerializer.java:55)
at 
org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer.write(BinaryInMemorySortBuffer.java:190)
at 
org.apache.flink.table.runtime.sort.BinaryExternalSorter.write(BinaryExternalSorter.java:540)
... 10 more
Caused by: java.lang.UnsupportedOperationException
at 
java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 27 more
{code}

the exception could also appears in a simple ITCase in attachments.

I find similar problems in [How to set Unmodifiable collection serializer of 
Kryo in Spark 
code|https://stackoverflow.com/questions/46818293/how-to-set-unmodifiable-collection-serializer-of-kryo-in-spark-code],
 however there is no way to set unmodifiable collection serializer of Kryo in 
Flink at present. Maybe we could take it into consideration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-13582) Improve the implementation of LISTAGG in Blink planner to remove delimiter from state

2019-08-05 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13582:
--

 Summary: Improve the implementation of LISTAGG in Blink planner to 
remove delimiter from state 
 Key: FLINK-13582
 URL: https://issues.apache.org/jira/browse/FLINK-13582
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Jing Zhang


The implementation of LISTAGG save delimiter as a part of state, which is not 
necessary, because delimiter is constant character.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13529) Verify and correct agg function's semantic for Blink planner

2019-08-01 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13529:
--

 Summary: Verify and correct agg function's semantic for Blink 
planner
 Key: FLINK-13529
 URL: https://issues.apache.org/jira/browse/FLINK-13529
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Planner
Affects Versions: 1.9.0, 1.10.0
Reporter: Jing Zhang
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13509) Support `is not distinct from ` and similar syntax in LookupJoin

2019-07-31 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13509:
--

 Summary: Support `is not distinct from ` and similar syntax in 
LookupJoin
 Key: FLINK-13509
 URL: https://issues.apache.org/jira/browse/FLINK-13509
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Jing Zhang
 Fix For: 1.10.0


Example1:
{code:java}
`SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable for system_time as 
of T.proctime AS D ON T.id = D.id OR (T.id is null and D.id is null)`
{code}

Example2:
{code:java}
"SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable for system_time as 
of T.proctime AS D ON T.id IS NOT  DISTINCT FROM  D.id"
{code}

Now run the above sql in Blink planner, the results are error.Because 
LookupJoin does not handle the case correctly.

Here is plan:
In 1.9 version, simply throw exception in compile phase for the above sql.
Support the feature in 1.10 version.




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13503) Correct the behavior of `JDBCLookupFunction`

2019-07-31 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13503:
--

 Summary: Correct the behavior of `JDBCLookupFunction`   
 Key: FLINK-13503
 URL: https://issues.apache.org/jira/browse/FLINK-13503
 Project: Flink
  Issue Type: Task
  Components: Connectors / JDBC
Affects Versions: 1.9.0, 1.10
Reporter: Jing Zhang


The query template in `JdbcLookUpFunction` like:
SELECT c, d, e, f from T where a = ? and b = ?

If pass (null, 1) to `eval` method, it will generate the following query:
SELECT c, d, e, f from T where a = null and b = ?
Which always outputs empty records.
Is this behavior reasonable?




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13433) Do not fetch data from LookupableTableSource if the JoinKey in left side of LookupJoin contains null value

2019-07-26 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13433:
--

 Summary: Do not fetch data from LookupableTableSource if the 
JoinKey in left side of LookupJoin contains null value
 Key: FLINK-13433
 URL: https://issues.apache.org/jira/browse/FLINK-13433
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Planner
Affects Versions: 1.9.0, 1.10.0
Reporter: Jing Zhang
 Fix For: 1.10.0


For LookupJoin, if joinKey in left side of a LeftOuterJoin/InnerJoin contains 
null values, there is no need to fetch data from `LookupableTableSource`. 
However, we don't shortcut the fetch function under the case at present, the 
correctness of results depends on the `TableFunction` implementation of  each 
`LookupableTableSource`.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13321) Join a udf with constant arguments or without argument in TableAPI query of Blink Planner does not work now

2019-07-18 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13321:
--

 Summary: Join a udf with constant arguments or without argument in 
TableAPI query of Blink Planner does not work now
 Key: FLINK-13321
 URL: https://issues.apache.org/jira/browse/FLINK-13321
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Reporter: Jing Zhang


In blink planner, Join a udf with constant arguments or without argument in 
TableAPI query does not work now, for example: error will be thrown if run the 
following two TableAPI query in Blink planner:

{code:java}
leftT.select('c).joinLateral(func0("1", "2"))

// leftT.select('c).joinLateral(func0())
{code}

The following error will be thrown:

{code:java}
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query: 

FlinkLogicalSink(name=[5771dc74-8986-4ffa-828f-8ed40602593a], fields=[c, f0])
+- FlinkLogicalCorrelate(correlation=[$cor3], joinType=[inner], 
requiredColumns=[{}])
   :- FlinkLogicalCalc(select=[c])
   :  +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, 
default_database, 15cbb5bf-816b-4319-9be8-6c648c868843]])
   +- FlinkLogicalCorrelate(correlation=[$cor4], joinType=[inner], 
requiredColumns=[{}])
  :- FlinkLogicalValues(tuples=[[{  }]])
  +- 
FlinkLogicalTableFunctionScan(invocation=[org$apache$flink$table$util$VarArgsFunc0$2ad590150fcbadcd9e420797d27a5eb1(_UTF-16LE'1',
 _UTF-16LE'2')], rowType=[RecordType(VARCHAR(2147483647) f0)], 
elementType=[class [Ljava.lang.Object;])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.

at 
org.apache.flink.table.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
at 
org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:63)
at 
org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
...

{code}

The root cause is the `FlinkLogicalTableFunctionScan`.CONVERTER translates a 
`TableFunctionScan` to a `Correlate`. Which will translate the original 
`RelNode` tree to a `RelNode` with two Cascaded ·Correlate` (could be found in 
the above thrown message), which could not translate to Physical `RelNode`.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13314) Correct resultType of some PlannerExpression when operands contains DecimalTypeInfo or BigDecimalTypeInfo in Blink planner

2019-07-17 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13314:
--

 Summary: Correct resultType of some PlannerExpression when 
operands contains DecimalTypeInfo or BigDecimalTypeInfo in Blink planner
 Key: FLINK-13314
 URL: https://issues.apache.org/jira/browse/FLINK-13314
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Planner
Reporter: Jing Zhang


Correct resultType of the following PlannerExpression when operands contains 
DecimalTypeInfo or BigDecimalTypeInfo in Blink planner:

Minus/plus/Div/Mul/Ceil/Floor/Round

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13301) Some PlannerExpression resultType is not consistent with Calcite Type inference

2019-07-17 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13301:
--

 Summary: Some PlannerExpression resultType is not consistent with 
Calcite Type inference
 Key: FLINK-13301
 URL: https://issues.apache.org/jira/browse/FLINK-13301
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Reporter: Jing Zhang


Some PlannerExpression resultType is not consistent with Calcite Type 
inference. The problem could be happened when run  the following example: 

{code:java}
// prepare source Data
val testData = new mutable.MutableList[(Int)]
testData.+=((3))
val t = env.fromCollection(testData).toTable(tEnv).as('a)

// register a TableSink
val fieldNames = Array("f0")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT())
//val fieldTypes: Array[TypeInformation[_]] = Array(Types.LONG())
val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
tEnv.registerTableSink("targetTable", sink.configure(fieldNames, 
fieldTypes))

t.select('a.floor()).insertInto("targetTable")

env.execute()
{code}

The cause is ResultType of `floor` is LONG_TYPE_INFO, while in Calcite 
`SqlFloorFunction` infers returnType is the type of the first argument(INT in 
the above case).
If I change `fieldTypes` to Array(Types.INT()), the following error will be 
thrown in compile phase.

{code:java}
org.apache.flink.table.api.ValidationException: Field types of query result and 
registered TableSink [targetTable] do not match.
Query result schema: [_c0: Long]
TableSink schema:[f0: Integer]

at 
org.apache.flink.table.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:59)
at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:158)
at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:157)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:157)
at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:129)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
{code}

And If I change `fieldTypes` to Array(Types.LONG()), the other error will be 
thrown in runtime.

{code:java}
org.apache.flink.table.api.TableException: Result field does not match 
requested type. Requested: Long; Actual: Integer

at 
org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:103)
at 
org.apache.flink.table.planner.Conversions$$anonfun$generateRowConverterFunction$2.apply(Conversions.scala:98)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98)
at 
org.apache.flink.table.planner.DataStreamConversions$.getConversionMapper(DataStreamConversions.scala:135)
at 
org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:91)
{code}

Above inconsistent problem also exists in `Floor`, `Ceil`, `Mod` and so on.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13218) '*.count not supported in TableApi query

2019-07-11 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13218:
--

 Summary: '*.count not supported in TableApi query
 Key: FLINK-13218
 URL: https://issues.apache.org/jira/browse/FLINK-13218
 Project: Flink
  Issue Type: Task
Reporter: Jing Zhang
Assignee: Jing Zhang


The following query is not supported yet:

{code:java}
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
  .groupBy('b)
  .select('b, 'a.sum, '*.count)
{code}

The following exception will be thrown.

{code:java}
org.apache.flink.table.api.ValidationException: Cannot resolve field [*], input 
field list:[a, b, c].

at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.failForField(ReferenceResolverRule.java:80)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$null$4(ReferenceResolverRule.java:75)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$null$5(ReferenceResolverRule.java:75)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$visit$6(ReferenceResolverRule.java:74)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.visit(ReferenceResolverRule.java:71)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.visit(ReferenceResolverRule.java:51)
at 
...
{code}




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13213) MinIdleStateRetentionTime/MaxIdleStateRetentionTime in TableConfig will be removed after call toAppendStream/toRetractStream without QueryConfig parameters

2019-07-11 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13213:
--

 Summary: MinIdleStateRetentionTime/MaxIdleStateRetentionTime in 
TableConfig will be removed after call toAppendStream/toRetractStream without 
QueryConfig parameters
 Key: FLINK-13213
 URL: https://issues.apache.org/jira/browse/FLINK-13213
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Reporter: Jing Zhang
Assignee: Jing Zhang


There are two `toAppendStream` method in `StreamTableEnvironment`:
1.  def toAppendStream[T: TypeInformation](table: Table): DataStream[T]
2.   def toAppendStream[T: TypeInformation](table: Table, queryConfig: 
StreamQueryConfig): DataStream[T]

After convert `Table` to `DataStream` by call the first method or 
toRetractStream, the MinIdleStateRetentionTime/MaxIdleStateRetentionTime in 
TableConfig will be removed.




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13071) QueryOperationConverter in Blink planner support add kinds of QueryOperations.

2019-07-02 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-13071:
--

 Summary: QueryOperationConverter in Blink planner support add 
kinds of QueryOperations.
 Key: FLINK-13071
 URL: https://issues.apache.org/jira/browse/FLINK-13071
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Reporter: Jing Zhang
Assignee: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12810) Support to run a TableAPI query like 'table.select('a, 'b, 'c)'

2019-06-11 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-12810:
--

 Summary: Support to run a TableAPI query like 'table.select('a, 
'b, 'c)'
 Key: FLINK-12810
 URL: https://issues.apache.org/jira/browse/FLINK-12810
 Project: Flink
  Issue Type: Task
Reporter: Jing Zhang


Support to run a TableAPI query like 'table.select('a, 'b, 'c)', including:
1. add RexNodeConverter to convert Expression to RexNode
2. Introduce RexPlannerExpression to wraps a RexNode, which is similar to 
RexPlannerExpression in flink-planner module.
3. add QueryOperationConverter to convert QueryOperation tree to RelNode
4. Introduce PlannerQueryOperation to wraps a RelNode in QueryOperation, which 
is similar to PlannerQueryOperation in flink-planner module.
5. add OperationTreeBuilder and relative classes(ExpressionResolver, 
TableReferenceLookup, FieldReferenceLoopup, ExpandColumnFunctionsRule, 
LookupCallByNameRule, ResolverRule, StarReferenceFlattingRule, 
VerifyNoUnresolvedExpressionRule), which is almost copy from flink-planner 
module to blink-planner module, except that we remove dependency on 
PlannerExpression. **Note: This is temporal solution, since 
OperationTreeBuilder will be moved to api module finally.**
6. add implicit conversion in expressionDsl



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12496) Support translation from StreamExecGroupWindowAggregate to StreamTransformation.

2019-05-13 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-12496:
--

 Summary: Support translation from StreamExecGroupWindowAggregate 
to StreamTransformation.
 Key: FLINK-12496
 URL: https://issues.apache.org/jira/browse/FLINK-12496
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Runtime
Reporter: Jing Zhang
Assignee: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12374) Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation.

2019-04-30 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-12374:
--

 Summary: Support translation from 
StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation.
 Key: FLINK-12374
 URL: https://issues.apache.org/jira/browse/FLINK-12374
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Reporter: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12348) Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-04-27 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-12348:
--

 Summary: Use TableConfig in api module to replace TableConfig in 
blink-planner module.
 Key: FLINK-12348
 URL: https://issues.apache.org/jira/browse/FLINK-12348
 Project: Flink
  Issue Type: Task
Reporter: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12307) Support translation from StreamExecWindowJoin to StreamTransformation.

2019-04-23 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-12307:
--

 Summary: Support translation from StreamExecWindowJoin to 
StreamTransformation.
 Key: FLINK-12307
 URL: https://issues.apache.org/jira/browse/FLINK-12307
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Reporter: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12208) Introduce Sort / TemporalSort / SortLimit/ Limit operators for blink streaming runtime

2019-04-15 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-12208:
--

 Summary:  Introduce Sort / TemporalSort / SortLimit/ Limit 
operators for blink streaming runtime 
 Key: FLINK-12208
 URL: https://issues.apache.org/jira/browse/FLINK-12208
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Runtime
Reporter: Jing Zhang


 Introduce Sort / TemporalSort / SortLimit/ Limit operators for blink streaming 
runtime 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12017) Support to run a sql query : 'SELECT * FROM source'

2019-03-26 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-12017:
--

 Summary: Support to run a sql query : 'SELECT * FROM source'
 Key: FLINK-12017
 URL: https://issues.apache.org/jira/browse/FLINK-12017
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Planner
Reporter: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11993) Introduce partitionable filesystem sink

2019-03-21 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-11993:
--

 Summary: Introduce partitionable filesystem sink
 Key: FLINK-11993
 URL: https://issues.apache.org/jira/browse/FLINK-11993
 Project: Flink
  Issue Type: Task
  Components: API / Table SQL
Reporter: Jing Zhang


Introduce partitionable filesystem sink,
1. Add partition trait for filesystem connector
2. All the filesystem formats can be declared as partitioned through new DDL 
grammar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11975) Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'

2019-03-20 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-11975:
--

 Summary: Support to run a sql query :  'SELECT * FROM (VALUES (1, 
2, 3)) T(a, b, c)'
 Key: FLINK-11975
 URL: https://issues.apache.org/jira/browse/FLINK-11975
 Project: Flink
  Issue Type: Task
  Components: SQL / Planner
Reporter: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11946) Introduce Exec nodes

2019-03-17 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-11946:
--

 Summary: Introduce Exec nodes
 Key: FLINK-11946
 URL: https://issues.apache.org/jira/browse/FLINK-11946
 Project: Flink
  Issue Type: Task
Reporter: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)