Flink SQL suport tfrecord format
hi,when I write a sql like this: String sqlCreate = "CREATE TABLE fs_table (\n" + " `examplestr` bytes\n" + ") WITH (\n" + " 'connector'='filesystem',\n" + " 'format'='raw',\n" + " 'path'='/tmp/zhangying480'\n" + ")"; I get an error like this,which means my tfrecord format is wrong,but it is correct: at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.IOException: Length header crc32 checking failed: -1837507072 != -252953760, length = 323850 at com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) I read the source code then have a result: public class SerializationSchemaAdapter implements Encoder { private static final long serialVersionUID = 1L; static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0]; @Override public void encode(RowData element, OutputStream stream) throws IOException { checkOpened(); stream.write(serializationSchema.serialize(element)); stream.write(LINE_DELIMITER); write line_delimiter will cause the incorrect result,so I realize a TFRecordSerializationSchemaAdapter and I override the FileSystemTableSink,then I fix the problem So far,I only suport filesystemsink with tfrecord format,do you think It is necessary to support other system?
Flink SQL suport tfrecord format
hi,when I write a sql like this: String sqlCreate = "CREATE TABLE fs_table (\n" + " `examplestr` bytes\n" + ") WITH (\n" + " 'connector'='filesystem',\n" + " 'format'='raw',\n" + " 'path'='/tmp/zhangying480'\n" + ")"; I get an error like this,which means my tfrecord format is wrong,but it is correct: at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.IOException: Length header crc32 checking failed: -1837507072 != -252953760, length = 323850 at com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) I read the source code then have a result: public class SerializationSchemaAdapter implements Encoder { private static final long serialVersionUID = 1L; static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0]; @Override public void encode(RowData element, OutputStream stream) throws IOException { checkOpened(); stream.write(serializationSchema.serialize(element)); stream.write(LINE_DELIMITER); write line_delimiter will cause the incorrect result,so I realize a TFRecordSerializationSchemaAdapter and I override the FileSystemTableSink,then I fix the problem So far,I only suport filesystemsink with tfrecord format,do you think It is necessary to support other system?
Flink SQL suport tfrecord format
hi,when I write a sql like this: String sqlCreate = "CREATE TABLE fs_table (\n" + " `examplestr` bytes\n" + ") WITH (\n" + " 'connector'='filesystem',\n" + " 'format'='raw',\n" + " 'path'='/tmp/zhangying480'\n" + ")"; I get an error like this,which means my tfrecord format is wrong,but it is correct: at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.IOException: Length header crc32 checking failed: -1837507072 != -252953760, length = 323850 at com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128) at com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) I read the source code then have a result: public class SerializationSchemaAdapter implements Encoder { private static final long serialVersionUID = 1L; static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0]; @Override public void encode(RowData element, OutputStream stream) throws IOException { checkOpened(); stream.write(serializationSchema.serialize(element)); stream.write(LINE_DELIMITER); write line_delimiter will cause the incorrect result,so I realize a TFRecordSerializationSchemaAdapter and I override the FileSystemTableSink,then I fix the problem So far,I only suport filesystemsink with tfrecord format,do you think It is necessary to support other system?
Table program cannot be compiled. This is a bug. Please file an issue
I write a long sql,but when I explain my plan,it make a mistake: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:77) at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:95) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:51) at org.apache.flink.client.program.topology.FlinkStreamTopology.setOperatorParameter(FlinkStreamTopology.java:75) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.flink.client.program.topology.FlinkStreamTopology.setOperatorParameters(FlinkStreamTopology.java:109) at org.apache.flink.client.program.topology.FlinkStreamTopology.updateStreamGraph(FlinkStreamTopology.java:122) at org.apache.flink.client.program.topology.FlinkStreamTopology.streamGraphTopoHandler(FlinkStreamTopology.java:115) at org.apache.flink.client.program.topology.FlinkStreamTopology.getPipelinePlanJson(FlinkStreamTopology.java:167) at org.apache.flink.client.program.AbstractFlinkTopology.getPlan(AbstractFlinkTopology.java:35) at org.apache.flink.client.program.PackagedProgramUtils.getPlanBox(PackagedProgramUtils.java:351) at org.apache.flink.runtime.webmonitor.handlers.JobPlanBoxHandler.lambda$handleRequest$4(JobPlanBoxHandler.java:138) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74) ... 20 common frames omitted Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89) at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ... 23 common frames omitted Caused by: org.codehaus.janino.InternalCompilerException: Compiling "BatchCalc$21123": Code of method "split$21122$(LBatchCalc$21123;)V" of class "BatchCalc$21123" grows beyond 64 KB at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86) ... 29 common frames omitted Caused by: org.codehaus.janino.InternalCompilerException: Code of method "split$21122$(LBatchCalc$21123;)V" of class "BatchCalc$21123" grows beyond 64 KB at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:1048) at org.codehaus.janino.CodeContext.write(CodeContext.java:940) at org.codehaus.janino.UnitCompiler.writeShort(UnitCompiler.java:12282) at
Re: [ANNOUNCE] RocksDB Version Upgrade and Performance
I am a heavy user of rocksdb, and my application works well on flink-1.10,but I am interst in the new features.I want to know if I can increase the manage memory to eliminate the impact of the performance regression In a certain degree。 Can you show me the params about rocksdb of your task?
when scalarfunction return treemap,the next operator is out of order
I extends scalarfunction like this,and I invoke tableEnv.createTemporarySystemFunction("tree_map", new TreeMapFunction()); public class TreeMapFunction extends ScalarFunction { private static Logger profileLog = Logger.getLogger("zhangying480"); @Override public boolean isDeterministic() { return false; } public Map eval() { Map result = new TreeMap<>(new Comparator() { @Override public int compare(String o1, String o2) { return o1.compareTo(o2); } }); result.put("a", "zhangying480"); result.put("as", "azhangying480"); result.put("abs", "aabzhangying480"); result.put("dv", "dzhangying480"); result.put("dd", "dczhangying480"); result.put("de", "eazhangying480"); result.put("e", "efzhangying480"); result.put("1", "eezhangying480"); result.put("#", "12zhangying480"); result.put("i", "#zhangying480"); return result; } } also,I extends scalarfunction like this,and I invoke tableEnv.createTemporarySystemFunction("map_unfold", new MapUnfoldFunction()) public class MapUnfoldFunction extends ScalarFunction { public static Logger log = LoggerFactory.getLogger(MapUnfoldFunction.class); public String eval(Map string2StringMap) { StringBuilder result = new StringBuilder(); if(null != string2StringMap) { Set> entrySet = string2StringMap.entrySet(); for (Map.Entry entry : entrySet) { result.append(entry.getKey()).append(","); } } return result.toString(); } } I run a sql like this: Table tableTreeMap = tableEnv.sqlQuery("select pvid,tree_map() as group_name from db1.`search_realtime_table_dump_p13`"); TableResult tableResult = tableEnv.executeSql("select map_unfold(group_name) as map_unfold from " + tableTreeMap) I think the result like this: #,1,a,abs,as,dd,de,dv,e,i, but I get the result like this,it is out-of-order | dd,1,a,de,as,dv,#,abs,e,i, | ++
Read Hive table in Stream Mode use distinct cause heap OOM
hi,I met an appearance like this: this is my sql: SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat FROM app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where dt='2021-04-01' When I useBlinkPlanner inBatchMode, It works well; But if I set inStreamMode, It cause a heap OOM. Causedby: java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer(DataOutputSerializer.java:85) at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:113) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$221/285424866.runDefaultAction(UnknownSource) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:620) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:584) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636) at java.lang.Thread.run(Thread.java:748) I use the rocksdb, and I confirm it works,then I jmap the tm: num #instances #bytes class name -- 1:214656 4420569368 [C 2:99 2376771576 [B 3:1379047722624 org.apache.flink.core.memory.HybridMemorySegment 4:2145395148936 java.lang.String 5: 317962635104 [Ljava.lang.Object; 6:1051332523192 [Lorg.apache.flink.core.memory.MemorySegment; 7:1051152522760 org.apache.flink.table.data.binary.BinarySection 8:1051152522760 org.apache.flink.table.data.binary.BinaryStringData 9: 328122099968 java.nio.DirectByteBuffer 10: 148381651560 java.lang.Class 11: 500021600064 java.util.concurrent.ConcurrentHashMap$Node 12: 430141376448 java.util.Hashtable$Entry 13: 328051312200 sun.misc.Cleaner It looks like the data is in heap rather than in rocksdb, Is there any way to set the data to the rocksdb?
Does it support rate-limiting in flink 1.12?
When I run a sql job with blink planner in my cluster,the task is almost preemption the whole resources in the cluster, and this is a bad effect to the stream task.As it is not necessary on speed,so is there any way to control the rate in my batch task? this is the machine performance in running some operator: https://issues.apache.org/jira/browse/FLINK-22204
Does it support gpu coding in flink?
HI,I am running a tf inference task on my cluster,but I flind it took so long a time to get response, becase it is a bert model and I run it on cpu machine.My componey has gpu k8s cluster,and I read the document https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/advanced/external_resources/ count you give me a demo?Including tf inference on gpu and train on gpu? I use alink in some of my task, is there a demo for alink on gpu? this is part of the answer: https://issues.apache.org/jira/browse/FLINK-22205
Does it support rate-limiting in flink 1.12?
When I run a sql job with blink planner in my cluster,the task is almost preemption the whole resources in the cluster, and this is a bad effect to the stream task.As it is not necessary on speed,so is there any way to control the rate in my batch task? this is the machine performance in running some operator:
union operation cannot support data type : map on flink 1.12
this is the error message: Flink SQL> (SELECT number_feature FROM map_string_string1) UNION (SELECT number_feature FROM map_string_string2); 2021-03-29 16:57:15,479 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to connect to metastore with URI thrift://localhost:9083 2021-03-29 16:57:15,483 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a connection to metastore, current connections: 2 2021-03-29 16:57:15,530 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to metastore. 2021-03-29 16:57:15,530 INFO org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 (auth:SIMPLE) retries=1 delay=1 lifetime=0 2021-03-29 16:57:15,580 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a connection to metastore, current connections: 1 2021-03-29 16:57:15,651 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to connect to metastore with URI thrift://localhost:9083 2021-03-29 16:57:15,652 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a connection to metastore, current connections: 2 2021-03-29 16:57:15,652 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to metastore. 2021-03-29 16:57:15,652 INFO org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 (auth:SIMPLE) retries=1 delay=1 lifetime=0 2021-03-29 16:57:15,698 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a connection to metastore, current connections: 1 [ERROR] Could not execute SQL statement. Reason: java.lang.UnsupportedOperationException: Unsupported type: MAP I meet a problem just like this: My flink version is 1.12.0,and I change the version to 1.12.3, the problem still exist. I create two tables which have data type map, I want to union two tables to one, but when i run on sql-client, I met such a problem:(number_feature is a map data) but when I run this sql,it has correct result:"(SELECT query FROM map_string_string1) UNION (SELECT query FROM map_string_string2)" The reason why it cannot support map union is a problem? this is my create table statements: CREATE TABLE `map_string_string1`( `query` string, `wid` string, `index` int, `page` string, `hc_cid1` string, `hc_cid2` string, `hc_cid3` string, `cid1` string, `cid2` string, `cid3` string, `ts` bigint, `number_feature` map) PARTITIONED BY ( `dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' CREATE TABLE `map_string_string2`( `query` string, `wid` string, `index` int, `page` string, `hc_cid1` string, `hc_cid2` string, `hc_cid3` string, `cid1` string, `cid2` string, `cid3` string, `ts` bigint, `number_feature` map) PARTITIONED BY ( `dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
Re:image
this is the contents of this image: Flink SQL> (SELECT number_feature FROM map_string_string1) UNION (SELECT number_feature FROM map_string_string2); 2021-03-29 16:57:15,479 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to connect to metastore with URI thrift://localhost:9083 2021-03-29 16:57:15,483 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a connection to metastore, current connections: 2 2021-03-29 16:57:15,530 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to metastore. 2021-03-29 16:57:15,530 INFO org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 (auth:SIMPLE) retries=1 delay=1 lifetime=0 2021-03-29 16:57:15,580 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a connection to metastore, current connections: 1 2021-03-29 16:57:15,651 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to connect to metastore with URI thrift://localhost:9083 2021-03-29 16:57:15,652 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a connection to metastore, current connections: 2 2021-03-29 16:57:15,652 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to metastore. 2021-03-29 16:57:15,652 INFO org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 (auth:SIMPLE) retries=1 delay=1 lifetime=0 2021-03-29 16:57:15,698 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a connection to metastore, current connections: 1 [ERROR] Could not execute SQL statement. Reason: java.lang.UnsupportedOperationException: Unsupported type: MAP At 2021-03-29 17:42:05, "张颖" wrote: hello, I meet a problem just like this: My flink version is 1.12.0,and I change the version to 1.12.3, the problem still exist. I create two tables which have data type map, I want to union two tables to one, but when i run on sql-client, I met such a problem:(number_feature is a map data) but when I run this sql,it has correct result:"(SELECT query FROM map_string_string1) UNION (SELECT query FROM map_string_string2)" The reason why it cannot support map union is a problem? this is my create table statements: CREATE TABLE `map_string_string1`( `query` string, `wid` string, `index` int, `page` string, `hc_cid1` string, `hc_cid2` string, `hc_cid3` string, `cid1` string, `cid2` string, `cid3` string, `ts` bigint, `number_feature` map) PARTITIONED BY ( `dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' CREATE TABLE `map_string_string2`( `query` string, `wid` string, `index` int, `page` string, `hc_cid1` string, `hc_cid2` string, `hc_cid3` string, `cid1` string, `cid2` string, `cid3` string, `ts` bigint, `number_feature` map) PARTITIONED BY ( `dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
Re:image
this is the image: At 2021-03-29 17:42:05, "张颖" wrote: hello, I meet a problem just like this: My flink version is 1.12.0,and I change the version to 1.12.3, the problem still exist. I create two tables which have data type map, I want to union two tables to one, but when i run on sql-client, I met such a problem:(number_feature is a map data) but when I run this sql,it has correct result:"(SELECT query FROM map_string_string1) UNION (SELECT query FROM map_string_string2)" The reason why it cannot support map union is a problem? this is my create table statements: CREATE TABLE `map_string_string1`( `query` string, `wid` string, `index` int, `page` string, `hc_cid1` string, `hc_cid2` string, `hc_cid3` string, `cid1` string, `cid2` string, `cid3` string, `ts` bigint, `number_feature` map) PARTITIONED BY ( `dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' CREATE TABLE `map_string_string2`( `query` string, `wid` string, `index` int, `page` string, `hc_cid1` string, `hc_cid2` string, `hc_cid3` string, `cid1` string, `cid2` string, `cid3` string, `ts` bigint, `number_feature` map) PARTITIONED BY ( `dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
Re:image
At 2021-03-29 17:42:05, "张颖" wrote: hello, I meet a problem just like this: My flink version is 1.12.0,and I change the version to 1.12.3, the problem still exist. I create two tables which have data type map, I want to union two tables to one, but when i run on sql-client, I met such a problem:(number_feature is a map data) but when I run this sql,it has correct result:"(SELECT query FROM map_string_string1) UNION (SELECT query FROM map_string_string2)" The reason why it cannot support map union is a problem? this is my create table statements: CREATE TABLE `map_string_string1`( `query` string, `wid` string, `index` int, `page` string, `hc_cid1` string, `hc_cid2` string, `hc_cid3` string, `cid1` string, `cid2` string, `cid3` string, `ts` bigint, `number_feature` map) PARTITIONED BY ( `dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' CREATE TABLE `map_string_string2`( `query` string, `wid` string, `index` int, `page` string, `hc_cid1` string, `hc_cid2` string, `hc_cid3` string, `cid1` string, `cid2` string, `cid3` string, `ts` bigint, `number_feature` map) PARTITIONED BY ( `dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
Re:退订
什么意思?直接退订吗? 在 2021-03-29 17:44:22,"纪军伟" 写道: >退订
Re:退订
什么意思?直接退订吗? 在 2021-03-29 17:44:22,"纪军伟" 写道: >退订