[jira] [Commented] (FLINK-13738) NegativeArraySizeException in LongHybridHashTable

2019-08-16 Thread Jingsong Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908867#comment-16908867
 ] 

Jingsong Lee commented on FLINK-13738:
--

Ah... Thanks [~rmetzger] for find this bug.

The range is computed by (maxKey - minKey + 1), it may be negative. it mean the 
range is bigger than Long.MaxValue when range is negative... I'll fix it.

> NegativeArraySizeException in LongHybridHashTable
> -
>
> Key: FLINK-13738
> URL: https://issues.apache.org/jira/browse/FLINK-13738
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.9.0
>Reporter: Robert Metzger
>Priority: Major
> Fix For: 1.10.0
>
>
> Executing this (meaningless) query:
> {code:java}
> INSERT INTO sinkTable ( SELECT CONCAT( CAST( id AS VARCHAR), CAST( COUNT(*) 
> AS VARCHAR)) as something, 'const' FROM CsvTable, table1  WHERE sometxt LIKE 
> 'a%' AND id = key GROUP BY id ) {code}
> leads to the following exception:
> {code:java}
> Caused by: java.lang.NegativeArraySizeException
>  at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.tryDenseMode(LongHybridHashTable.java:216)
>  at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.endBuild(LongHybridHashTable.java:105)
>  at LongHashJoinOperator$36.endInput1$(Unknown Source)
>  at LongHashJoinOperator$36.endInput(Unknown Source)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:256)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.checkFinished(StreamTwoInputSelectableProcessor.java:359)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.processInput(StreamTwoInputSelectableProcessor.java:193)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:687)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:517)
>  at java.lang.Thread.run(Thread.java:748){code}
> This is the plan:
>  
> {code:java}
> == Abstract Syntax Tree ==
> LogicalSink(name=[sinkTable], fields=[f0, f1])
> +- LogicalProject(something=[CONCAT(CAST($0):VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE", CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT 
> NULL)], EXPR$1=[_UTF-16LE'const'])
>+- LogicalAggregate(group=[{0}], agg#0=[COUNT()])
>   +- LogicalProject(id=[$1])
>  +- LogicalFilter(condition=[AND(LIKE($0, _UTF-16LE'a%'), =($1, 
> CAST($2):BIGINT))])
> +- LogicalJoin(condition=[true], joinType=[inner])
>:- LogicalTableScan(table=[[default_catalog, default_database, 
> CsvTable, source: [CsvTableSource(read fields: sometxt, id)]]])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> table1, source: [GeneratorTableSource(key, rowtime, payload)]]])
> == Optimized Logical Plan ==
> Sink(name=[sinkTable], fields=[f0, f1]): rowcount = 1498810.6659336376, 
> cumulative cost = {4.459964319978008E8 rows, 1.879799762133187E10 cpu, 4.8E9 
> io, 8.4E8 network, 1.799524266373455E8 memory}
> +- Calc(select=[CONCAT(CAST(id), CAST($f1)) AS something, _UTF-16LE'const' AS 
> EXPR$1]): rowcount = 1498810.6659336376, cumulative cost = 
> {4.444976213318672E8 rows, 1.8796498810665936E10 cpu, 4.8E9 io, 8.4E8 
> network, 1.799524266373455E8 memory}
>+- HashAggregate(isMerge=[false], groupBy=[id], select=[id, COUNT(*) AS 
> $f1]): rowcount = 1498810.6659336376, cumulative cost = {4.429988106659336E8 
> rows, 1.8795E10 cpu, 4.8E9 io, 8.4E8 network, 1.799524266373455E8 memory}
>   +- Calc(select=[id]): rowcount = 1.575E7, cumulative cost = {4.415E8 
> rows, 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory}
>  +- HashJoin(joinType=[InnerJoin], where=[=(id, key0)], select=[id, 
> key0], build=[left]): rowcount = 1.575E7, cumulative cost = {4.2575E8 rows, 
> 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory}
> :- Exchange(distribution=[hash[id]]): rowcount = 500.0, 
> cumulative cost = {1.1E8 rows, 8.4E8 cpu, 2.0E9 io, 4.0E7 network, 0.0 memory}
> :  +- Calc(select=[id], where=[LIKE(sometxt, _UTF-16LE'a%')]): 
> rowcount = 500.0, cumulative cost = {1.05E8 rows, 0.0 cpu, 2.0E9 io, 0.0 
> network, 0.0 memory}
> : +- TableSourceScan(table=[[default_catalog, 
> default_database, CsvTable, source: [CsvTableSource(read fields: sometxt, 
> id)]]], fields=[sometxt, id]): rowcount = 1.0E8, cumulative cost = {1.0E8 
> rows, 0.0 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
> +- Exchange(distributi

[jira] [Commented] (FLINK-13738) NegativeArraySizeException in LongHybridHashTable

2019-08-15 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908675#comment-16908675
 ] 

Jark Wu commented on FLINK-13738:
-

cc [~lzljs3620320] [~TsReaper]

> NegativeArraySizeException in LongHybridHashTable
> -
>
> Key: FLINK-13738
> URL: https://issues.apache.org/jira/browse/FLINK-13738
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.9.0
>Reporter: Robert Metzger
>Priority: Major
>
> Executing this (meaningless) query:
> {code:java}
> INSERT INTO sinkTable ( SELECT CONCAT( CAST( id AS VARCHAR), CAST( COUNT(*) 
> AS VARCHAR)) as something, 'const' FROM CsvTable, table1  WHERE sometxt LIKE 
> 'a%' AND id = key GROUP BY id ) {code}
> leads to the following exception:
> {code:java}
> Caused by: java.lang.NegativeArraySizeException
>  at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.tryDenseMode(LongHybridHashTable.java:216)
>  at 
> org.apache.flink.table.runtime.hashtable.LongHybridHashTable.endBuild(LongHybridHashTable.java:105)
>  at LongHashJoinOperator$36.endInput1$(Unknown Source)
>  at LongHashJoinOperator$36.endInput(Unknown Source)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:256)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.checkFinished(StreamTwoInputSelectableProcessor.java:359)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.processInput(StreamTwoInputSelectableProcessor.java:193)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:687)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:517)
>  at java.lang.Thread.run(Thread.java:748){code}
> This is the plan:
>  
> {code:java}
> == Abstract Syntax Tree ==
> LogicalSink(name=[sinkTable], fields=[f0, f1])
> +- LogicalProject(something=[CONCAT(CAST($0):VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE", CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT 
> NULL)], EXPR$1=[_UTF-16LE'const'])
>+- LogicalAggregate(group=[{0}], agg#0=[COUNT()])
>   +- LogicalProject(id=[$1])
>  +- LogicalFilter(condition=[AND(LIKE($0, _UTF-16LE'a%'), =($1, 
> CAST($2):BIGINT))])
> +- LogicalJoin(condition=[true], joinType=[inner])
>:- LogicalTableScan(table=[[default_catalog, default_database, 
> CsvTable, source: [CsvTableSource(read fields: sometxt, id)]]])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> table1, source: [GeneratorTableSource(key, rowtime, payload)]]])
> == Optimized Logical Plan ==
> Sink(name=[sinkTable], fields=[f0, f1]): rowcount = 1498810.6659336376, 
> cumulative cost = {4.459964319978008E8 rows, 1.879799762133187E10 cpu, 4.8E9 
> io, 8.4E8 network, 1.799524266373455E8 memory}
> +- Calc(select=[CONCAT(CAST(id), CAST($f1)) AS something, _UTF-16LE'const' AS 
> EXPR$1]): rowcount = 1498810.6659336376, cumulative cost = 
> {4.444976213318672E8 rows, 1.8796498810665936E10 cpu, 4.8E9 io, 8.4E8 
> network, 1.799524266373455E8 memory}
>+- HashAggregate(isMerge=[false], groupBy=[id], select=[id, COUNT(*) AS 
> $f1]): rowcount = 1498810.6659336376, cumulative cost = {4.429988106659336E8 
> rows, 1.8795E10 cpu, 4.8E9 io, 8.4E8 network, 1.799524266373455E8 memory}
>   +- Calc(select=[id]): rowcount = 1.575E7, cumulative cost = {4.415E8 
> rows, 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory}
>  +- HashJoin(joinType=[InnerJoin], where=[=(id, key0)], select=[id, 
> key0], build=[left]): rowcount = 1.575E7, cumulative cost = {4.2575E8 rows, 
> 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory}
> :- Exchange(distribution=[hash[id]]): rowcount = 500.0, 
> cumulative cost = {1.1E8 rows, 8.4E8 cpu, 2.0E9 io, 4.0E7 network, 0.0 memory}
> :  +- Calc(select=[id], where=[LIKE(sometxt, _UTF-16LE'a%')]): 
> rowcount = 500.0, cumulative cost = {1.05E8 rows, 0.0 cpu, 2.0E9 io, 0.0 
> network, 0.0 memory}
> : +- TableSourceScan(table=[[default_catalog, 
> default_database, CsvTable, source: [CsvTableSource(read fields: sometxt, 
> id)]]], fields=[sometxt, id]): rowcount = 1.0E8, cumulative cost = {1.0E8 
> rows, 0.0 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
> +- Exchange(distribution=[hash[key0]]): rowcount = 1.0E8, 
> cumulative cost = {3.0E8 rows, 1.68E10 cpu, 2.8E9 io, 8.0E8 network, 0.0 
> memory}
>+- Calc(select=[CAST(key) AS key0]): rowcount = 1.0E8, 
> cumulative co