[jira] [Updated] (FLINK-17803) Flink batch sql job read hive table which contains map type, raise exception: " scala.MatchError: MAP"
[ https://issues.apache.org/jira/browse/FLINK-17803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zouyunhe updated FLINK-17803: - Description: We use flink 1.10.0 , blink planner, to submit a batch sql job to read from a hive table which contains map type fields, and then aggregate. the sql as below: ``` create view aaa as select * from table1 where event_id = '0103002' and `day`='2020-05-13' and `hour`='13'; create view view_1 as select `day`, a.rtime as itime, a.uid as uid, trim(BOTH a.event.log_1['scene']) as refer_list, T.s as abflags, a.hdid as hdid, a.country as country from aaa as a left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_1['abflag]), ',')) as T(s) on true; {color:#172b4d}CREATE VIEW view_6 as {color} {color:#172b4d} SELECT{color} {color:#172b4d} `uid`,{color} {color:#172b4d} `refer_list`,{color} {color:#172b4d} `abflag`,{color} {color:#172b4d} last_value(country){color} {color:#172b4d} FROM view_1{color} {color:#172b4d} where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR'){color} {color:#172b4d} GROUP BY `uid`, `refer_list`, abflag;{color} insert into ``` when submit the job, the exception occurs as below: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 11 more Caused by: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212) at org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) and then we found the method hashCodeForType in the CodeGenUtils class do not match MAP type. and we fix it as below ``` def hashCodeForType( ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match { case BOOLEAN => s"$\\{className[JBoolean]} .hashCode($term)" case MAP => s"$\{className[BaseMap]}.getHashCode($term)" //the code we add case TINYINT => s"$\{className[JByte]}.hashCode($term)" ``` then the job can be sumitted, it run for a while, another exception occurs: java.lang.RuntimeException: Could not instantiate generated class 'HashAggregateWithKeys$1543' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46) at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:156) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at
[jira] [Updated] (FLINK-17803) Flink batch sql job read hive table which contains map type, raise exception: " scala.MatchError: MAP"
[ https://issues.apache.org/jira/browse/FLINK-17803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zouyunhe updated FLINK-17803: - Description: We use flink 1.10.0 , blink planner, to submit a batch sql job to read from a hive table which contains map type fields, and then aggregate. the sql as below: ``` create view aaa as select * from table1 where event_id = '0103002' and `day`='2020-05-13' and `hour`='13'; create view view_1 as select `day`, a.rtime as itime, a.uid as uid, trim(BOTH a.event.log_1['scene']) as refer_list, T.s as abflags, a.hdid as hdid, a.country as country from aaa as a left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_1['abflag]), ',')) as T(s) on true; {color:#172b4d}CREATE VIEW view_6 as {color} {color:#172b4d} SELECT{color} {color:#172b4d} `uid`,{color} {color:#172b4d} `refer_list`,{color} {color:#172b4d} `abflag`,{color} {color:#172b4d} last_value(country){color} {color:#172b4d} FROM view_1{color} {color:#172b4d} where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR'){color} {color:#172b4d} GROUP BY `uid`, `refer_list`, abflag;{color} insert into ``` when submit the job, the exception occurs as below: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 11 more Caused by: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212) at org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) and then we found, the hashCodeForType method in the CodeGenUtils do not match MAP type, and we fix it as below def hashCodeForType( ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match { case BOOLEAN => s"$\{className[JBoolean]}.hashCode($term)" case MAP => s"$\{className[BaseMap]}.getHashCode($term)" case TINYINT => s"$\{className[JByte]}.hashCode($term)" then the job can be sumitted, it run for a while, another exception occurs: java.lang.RuntimeException: Could not instantiate generated class 'HashAggregateWithKeys$1543' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46) at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:156) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at
[jira] [Updated] (FLINK-17803) Flink batch sql job read hive table which contains map type, raise exception: " scala.MatchError: MAP"
[ https://issues.apache.org/jira/browse/FLINK-17803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zouyunhe updated FLINK-17803: - Summary: Flink batch sql job read hive table which contains map type, raise exception: " scala.MatchError: MAP" (was: Flink batch sql job read hive table with map type, raise exception: " scala.MatchError: MAP") > Flink batch sql job read hive table which contains map type, raise exception: > " scala.MatchError: MAP" > -- > > Key: FLINK-17803 > URL: https://issues.apache.org/jira/browse/FLINK-17803 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.10.0 >Reporter: zouyunhe >Priority: Major > > We use flink 1.10.0 , blink planner, to submit a batch sql job to read > from a hive table which contains map type fields, and then aggregate. the > sql as below: > > create view aaa > as select * from table1 where event_id = '0103002' and `day`='2020-05-13' > and `hour`='13'; > create view view_1 > as > select > `day`, > a.rtime as itime, > a.uid as uid, > trim(BOTH a.event.log_1['scene']) as refer_list, > T.s as abflags, > a.hdid as hdid, > a.country as country > from aaa as a > left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_1['abflag]), > ',')) as T(s) on true; > {color:#172b4d}CREATE VIEW view_6 as {color} > {color:#172b4d} SELECT{color} > {color:#172b4d} `uid`,{color} > {color:#172b4d} `refer_list`,{color} > {color:#172b4d} `abflag`,{color} > {color:#172b4d} last_value(country){color} > {color:#172b4d} FROM view_1{color} > {color:#172b4d} where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', > 'WELOG_POPULAR'){color} > {color:#172b4d} GROUP BY `uid`, `refer_list`, abflag;{color} > > insert into > > when submit the job, the exception occurs as below: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: scala.MatchError: MAP (of class > org.apache.flink.table.types.logical.LogicalTypeRoot) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class > org.apache.flink.table.types.logical.LogicalTypeRoot) > at > sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > ... 11 more > Caused by: scala.MatchError: MAP (of class > org.apache.flink.table.types.logical.LogicalTypeRoot) > at > org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212) > at > org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > > and then we found, the hashCodeForType method in the CodeGenUtils do not > match MAP type, and we fix it as below > > def hashCodeForType( > ctx: CodeGeneratorContext, t: LogicalType, term: String): String = > t.getTypeRoot match { > case BOOLEAN => s"${className[JBoolean]}.hashCode($term)" > case MAP => s"${className[BaseMap]}.getHashCode($term)" > case TINYINT => s"${className[JByte]}.hashCode($term)" > > then the job can be sumitted,