[jira] [Updated] (FLINK-17803) Flink batch sql job read hive table which contains map type, raise exception: " scala.MatchError: MAP"

2020-05-18 Thread zouyunhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zouyunhe updated FLINK-17803:
-
Description: 
We use flink 1.10.0 ,  blink planner,  to  submit a batch sql job to read from 
a hive table which contains map type fields, and then aggregate.   the sql as 
below:

```
 create view aaa
 as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
 and `hour`='13';
 create view view_1
 as
 select
 `day`,
 a.rtime as itime,
 a.uid as uid,
 trim(BOTH a.event.log_1['scene']) as refer_list,
 T.s as abflags,
 a.hdid as hdid,
 a.country as country
 from aaa as a
 left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_1['abflag]),
 ',')) as T(s) on true;

{color:#172b4d}CREATE VIEW view_6 as {color}
 {color:#172b4d} SELECT{color}
 {color:#172b4d} `uid`,{color}
 {color:#172b4d} `refer_list`,{color}
 {color:#172b4d} `abflag`,{color}
 {color:#172b4d}        last_value(country){color}
 {color:#172b4d} FROM view_1{color}
 {color:#172b4d} where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 
'WELOG_POPULAR'){color}
 {color:#172b4d} GROUP BY  `uid`, `refer_list`, abflag;{color}
 insert into 
 ``` 

when submit the job, the exception occurs as below:
 org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: scala.MatchError: MAP (of class 
org.apache.flink.table.types.logical.LogicalTypeRoot)
         at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
         at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
         at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
         at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
         at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
         at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:422)
         at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
         at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
 Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class 
org.apache.flink.table.types.logical.LogicalTypeRoot)
         at 
sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
         ... 11 more
 Caused by: scala.MatchError: MAP (of class 
org.apache.flink.table.types.logical.LogicalTypeRoot)
         at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212)
         at 
org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97)
         at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
         at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  
 and then we found the method hashCodeForType  in the CodeGenUtils class do not 
match MAP type.  and we fix it as below
```
 def hashCodeForType(
 ctx: CodeGeneratorContext, t: LogicalType, term: String): String = 
t.getTypeRoot match

{ case BOOLEAN => s"$\\{className[JBoolean]}

.hashCode($term)"
 case MAP => s"$\{className[BaseMap]}.getHashCode($term)"  //the code we add
 case TINYINT => s"$\{className[JByte]}.hashCode($term)"
 ```


 then the job can be sumitted, it run for a while, another exception occurs:
 java.lang.RuntimeException: Could not instantiate generated class 
'HashAggregateWithKeys$1543'
 at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
 at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46)
 at 
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:156)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
 at 

[jira] [Updated] (FLINK-17803) Flink batch sql job read hive table which contains map type, raise exception: " scala.MatchError: MAP"

2020-05-18 Thread zouyunhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zouyunhe updated FLINK-17803:
-
Description: 
We use flink 1.10.0 ,  blink planner,  to  submit a batch sql job to read from 
a hive table which contains map type fields, and then aggregate.   the sql as 
below:

```
 create view aaa
 as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
 and `hour`='13';
 create view view_1
 as
 select
 `day`,
 a.rtime as itime,
 a.uid as uid,
 trim(BOTH a.event.log_1['scene']) as refer_list,
 T.s as abflags,
 a.hdid as hdid,
 a.country as country
 from aaa as a
 left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_1['abflag]),
 ',')) as T(s) on true;

{color:#172b4d}CREATE VIEW view_6 as {color}
 {color:#172b4d} SELECT{color}
 {color:#172b4d} `uid`,{color}
 {color:#172b4d} `refer_list`,{color}
 {color:#172b4d} `abflag`,{color}
 {color:#172b4d}        last_value(country){color}
 {color:#172b4d} FROM view_1{color}
 {color:#172b4d} where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 
'WELOG_POPULAR'){color}
 {color:#172b4d} GROUP BY  `uid`, `refer_list`, abflag;{color}
 insert into 
``` 


 when submit the job, the exception occurs as below:
 org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: scala.MatchError: MAP (of class 
org.apache.flink.table.types.logical.LogicalTypeRoot)
         at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
         at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
         at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
         at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
         at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
         at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:422)
         at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
         at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
 Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class 
org.apache.flink.table.types.logical.LogicalTypeRoot)
         at 
sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
         ... 11 more
 Caused by: scala.MatchError: MAP (of class 
org.apache.flink.table.types.logical.LogicalTypeRoot)
         at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212)
         at 
org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97)
         at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
         at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  
 and then we found, the hashCodeForType method in the CodeGenUtils do not match 
MAP type, and we fix it as below
  
 def hashCodeForType(
 ctx: CodeGeneratorContext, t: LogicalType, term: String): String = 
t.getTypeRoot match {
 case BOOLEAN => s"$\{className[JBoolean]}.hashCode($term)"
 case MAP => s"$\{className[BaseMap]}.getHashCode($term)"
 case TINYINT => s"$\{className[JByte]}.hashCode($term)"
  
 then the job can be sumitted, it run for a while, another exception occurs:
 java.lang.RuntimeException: Could not instantiate generated class 
'HashAggregateWithKeys$1543'
 at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
 at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46)
 at 
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:156)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
 at 

[jira] [Updated] (FLINK-17803) Flink batch sql job read hive table which contains map type, raise exception: " scala.MatchError: MAP"

2020-05-18 Thread zouyunhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zouyunhe updated FLINK-17803:
-
Summary: Flink batch sql job read hive table which contains map type, raise 
exception: " scala.MatchError: MAP"  (was: Flink batch sql job read hive table 
with map type, raise exception: " scala.MatchError: MAP")

> Flink batch sql job read hive table which contains map type, raise exception: 
> " scala.MatchError: MAP"
> --
>
> Key: FLINK-17803
> URL: https://issues.apache.org/jira/browse/FLINK-17803
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: zouyunhe
>Priority: Major
>
> We use flink 1.10.0 ,  blink planner,  to  submit a batch sql job to read 
> from a hive table which contains map type fields, and then aggregate.   the 
> sql as below:
>  
> create view aaa
> as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
> and `hour`='13';
> create view view_1
> as
> select
> `day`,
> a.rtime as itime,
> a.uid as uid,
> trim(BOTH a.event.log_1['scene']) as refer_list,
> T.s as abflags,
> a.hdid as hdid,
> a.country as country
> from aaa as a
> left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_1['abflag]),
> ',')) as T(s) on true;
> {color:#172b4d}CREATE VIEW view_6 as {color}
> {color:#172b4d} SELECT{color}
> {color:#172b4d} `uid`,{color}
> {color:#172b4d} `refer_list`,{color}
> {color:#172b4d} `abflag`,{color}
> {color:#172b4d}        last_value(country){color}
> {color:#172b4d} FROM view_1{color}
> {color:#172b4d} where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 
> 'WELOG_POPULAR'){color}
> {color:#172b4d} GROUP BY  `uid`, `refer_list`, abflag;{color}
>  
> insert into 
>  
> when submit the job, the exception occurs as below:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: scala.MatchError: MAP (of class 
> org.apache.flink.table.types.logical.LogicalTypeRoot)
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>         at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>         at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>         at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>         at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class 
> org.apache.flink.table.types.logical.LogicalTypeRoot)
>         at 
> sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>         ... 11 more
> Caused by: scala.MatchError: MAP (of class 
> org.apache.flink.table.types.logical.LogicalTypeRoot)
>         at 
> org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212)
>         at 
> org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97)
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>         at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>  
> and then we found, the hashCodeForType method in the CodeGenUtils do not 
> match MAP type, and we fix it as below
>  
> def hashCodeForType(
>  ctx: CodeGeneratorContext, t: LogicalType, term: String): String = 
> t.getTypeRoot match {
>  case BOOLEAN => s"${className[JBoolean]}.hashCode($term)"
>  case MAP => s"${className[BaseMap]}.getHashCode($term)"
>  case TINYINT => s"${className[JByte]}.hashCode($term)"
>  
> then the job can be sumitted,