Flink SQL suport tfrecord format

2022-09-20 Thread
hi,when I write a sql like this:




String sqlCreate = "CREATE TABLE fs_table (\n" +
"  `examplestr` bytes\n" +
")  WITH (\n" +
"  'connector'='filesystem',\n" +
"  'format'='raw',\n" +
"  'path'='/tmp/zhangying480'\n" +
")";
 I get an error like this,which means my tfrecord format is wrong,but it is 
correct:
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: Length header crc32 checking failed: 
-1837507072 != -252953760, length = 323850
at 
com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


I read the source code then have a result:
public class SerializationSchemaAdapter implements Encoder {

private static final long serialVersionUID = 1L;

static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];


@Override
public void encode(RowData element, OutputStream stream) throws IOException {
checkOpened();
stream.write(serializationSchema.serialize(element));
stream.write(LINE_DELIMITER);


write line_delimiter will cause the incorrect result,so I realize a 
TFRecordSerializationSchemaAdapter
and I override the FileSystemTableSink,then I fix the problem


  
So far,I only suport filesystemsink with tfrecord format,do you think It is 
necessary to support other system?

















Flink SQL suport tfrecord format

2022-09-20 Thread
hi,when I write a sql like this:




String sqlCreate = "CREATE TABLE fs_table (\n" +
"  `examplestr` bytes\n" +
")  WITH (\n" +
"  'connector'='filesystem',\n" +
"  'format'='raw',\n" +
"  'path'='/tmp/zhangying480'\n" +
")";
 I get an error like this,which means my tfrecord format is wrong,but it is 
correct:
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: Length header crc32 checking failed: 
-1837507072 != -252953760, length = 323850
at 
com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


I read the source code then have a result:
public class SerializationSchemaAdapter implements Encoder {

private static final long serialVersionUID = 1L;

static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];


@Override
public void encode(RowData element, OutputStream stream) throws IOException {
checkOpened();
stream.write(serializationSchema.serialize(element));
stream.write(LINE_DELIMITER);


write line_delimiter will cause the incorrect result,so I realize a 
TFRecordSerializationSchemaAdapter
and I override the FileSystemTableSink,then I fix the problem


  
So far,I only suport filesystemsink with tfrecord format,do you think It is 
necessary to support other system?

















Flink SQL suport tfrecord format

2022-09-20 Thread
hi,when I write a sql like this:




String sqlCreate = "CREATE TABLE fs_table (\n" +
"  `examplestr` bytes\n" +
")  WITH (\n" +
"  'connector'='filesystem',\n" +
"  'format'='raw',\n" +
"  'path'='/tmp/zhangying480'\n" +
")";
 I get an error like this,which means my tfrecord format is wrong,but it is 
correct:
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: Length header crc32 checking failed: 
-1837507072 != -252953760, length = 323850
at 
com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


I read the source code then have a result:
public class SerializationSchemaAdapter implements Encoder {

private static final long serialVersionUID = 1L;

static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];


@Override
public void encode(RowData element, OutputStream stream) throws IOException {
checkOpened();
stream.write(serializationSchema.serialize(element));
stream.write(LINE_DELIMITER);


write line_delimiter will cause the incorrect result,so I realize a 
TFRecordSerializationSchemaAdapter
and I override the FileSystemTableSink,then I fix the problem


  
So far,I only suport filesystemsink with tfrecord format,do you think It is 
necessary to support other system?

















Table program cannot be compiled. This is a bug. Please file an issue

2021-09-13 Thread
I write a long sql,but when I explain my plan,it make a mistake:


org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:77)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:95)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:51)
at 
org.apache.flink.client.program.topology.FlinkStreamTopology.setOperatorParameter(FlinkStreamTopology.java:75)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at 
org.apache.flink.client.program.topology.FlinkStreamTopology.setOperatorParameters(FlinkStreamTopology.java:109)
at 
org.apache.flink.client.program.topology.FlinkStreamTopology.updateStreamGraph(FlinkStreamTopology.java:122)
at 
org.apache.flink.client.program.topology.FlinkStreamTopology.streamGraphTopoHandler(FlinkStreamTopology.java:115)
at 
org.apache.flink.client.program.topology.FlinkStreamTopology.getPipelinePlanJson(FlinkStreamTopology.java:167)
at 
org.apache.flink.client.program.AbstractFlinkTopology.getPlan(AbstractFlinkTopology.java:35)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPlanBox(PackagedProgramUtils.java:351)
at 
org.apache.flink.runtime.webmonitor.handlers.JobPlanBoxHandler.lambda$handleRequest$4(JobPlanBoxHandler.java:138)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
... 20 common frames omitted
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
at 
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
... 23 common frames omitted
Caused by: org.codehaus.janino.InternalCompilerException: Compiling 
"BatchCalc$21123": Code of method "split$21122$(LBatchCalc$21123;)V" of class 
"BatchCalc$21123" grows beyond 64 KB
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at 
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
... 29 common frames omitted
Caused by: org.codehaus.janino.InternalCompilerException: Code of method 
"split$21122$(LBatchCalc$21123;)V" of class "BatchCalc$21123" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:1048)
at org.codehaus.janino.CodeContext.write(CodeContext.java:940)
at org.codehaus.janino.UnitCompiler.writeShort(UnitCompiler.java:12282)
at 

Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-06 Thread
I am a heavy user of rocksdb, and my application works well on flink-1.10,but I 
am interst in the new features.I want to know if I can increase the manage 
memory to eliminate the impact of  the performance regression In a certain 
degree。


Can you show me the params about rocksdb of your task?

when scalarfunction return treemap,the next operator is out of order

2021-07-29 Thread
I extends scalarfunction like this,and I invoke 
tableEnv.createTemporarySystemFunction("tree_map", new TreeMapFunction());
public class TreeMapFunction extends ScalarFunction {
private static Logger profileLog = Logger.getLogger("zhangying480");

@Override
public boolean isDeterministic() {
return false;
}

public Map eval() {
Map result = new TreeMap<>(new Comparator() {
@Override
public int compare(String o1, String o2) {
return o1.compareTo(o2);
}
});

result.put("a", "zhangying480");
result.put("as", "azhangying480");
result.put("abs", "aabzhangying480");
result.put("dv", "dzhangying480");
result.put("dd", "dczhangying480");
result.put("de", "eazhangying480");
result.put("e", "efzhangying480");
result.put("1", "eezhangying480");
result.put("#", "12zhangying480");
result.put("i", "#zhangying480");
return result;
}
}
also,I extends scalarfunction like this,and I invoke 
tableEnv.createTemporarySystemFunction("map_unfold", new MapUnfoldFunction())
public class MapUnfoldFunction extends ScalarFunction {
public static Logger log = LoggerFactory.getLogger(MapUnfoldFunction.class);
public String eval(Map string2StringMap) {
StringBuilder result = new StringBuilder();
if(null != string2StringMap) {
Set> entrySet = 
string2StringMap.entrySet();
for (Map.Entry entry : entrySet) {
result.append(entry.getKey()).append(",");
}
}
return result.toString();
}
}


I run a sql like this:
Table tableTreeMap = tableEnv.sqlQuery("select pvid,tree_map() as group_name 
from db1.`search_realtime_table_dump_p13`");
TableResult tableResult = tableEnv.executeSql("select map_unfold(group_name) as 
map_unfold from " + tableTreeMap)


I think the result like this:
#,1,a,abs,as,dd,de,dv,e,i,


but I get the result like this,it is out-of-order
| dd,1,a,de,as,dv,#,abs,e,i, |
++



Read Hive table in Stream Mode use distinct cause heap OOM

2021-04-25 Thread
hi,I met an appearance like this:


this is my sql:
SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat FROM 
app.app_ranking_feature_table_clk_ord_hp_new_all_tree_orc where dt='2021-04-01'




When I useBlinkPlanner inBatchMode, It works well; But if I set inStreamMode,
It cause a heap OOM.


Causedby: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at 
org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer(DataOutputSerializer.java:85)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:113)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:129)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:399)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$221/285424866.runDefaultAction(UnknownSource)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:620)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:584)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:844)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:636)
at java.lang.Thread.run(Thread.java:748)



I use the rocksdb, and I confirm it works,then I jmap the tm:
num #instances #bytes  class name
--
   1:214656 4420569368  [C
   2:99 2376771576  [B
   3:1379047722624  
org.apache.flink.core.memory.HybridMemorySegment
   4:2145395148936  java.lang.String
   5: 317962635104  [Ljava.lang.Object;
   6:1051332523192  
[Lorg.apache.flink.core.memory.MemorySegment;
   7:1051152522760  
org.apache.flink.table.data.binary.BinarySection
   8:1051152522760  
org.apache.flink.table.data.binary.BinaryStringData
   9: 328122099968  java.nio.DirectByteBuffer
  10: 148381651560  java.lang.Class
  11: 500021600064  java.util.concurrent.ConcurrentHashMap$Node
  12: 430141376448  java.util.Hashtable$Entry
  13: 328051312200  sun.misc.Cleaner



It looks like the data is in heap rather than in rocksdb, Is there any way to 
set the data to the rocksdb?



Does it support rate-limiting in flink 1.12?

2021-04-12 Thread
When I run a sql job with blink planner in my cluster,the task is almost 
preemption the whole resources in the cluster,  and this is a bad effect to the 
stream task.As it is not necessary on speed,so is there any way to control the 
rate in my batch task?

 

this is the machine performance in running some operator:
https://issues.apache.org/jira/browse/FLINK-22204



Does it support gpu coding in flink?

2021-04-12 Thread
HI,I am running a tf inference task on my cluster,but I flind it took so long a 
time to get response, becase it is a bert model and I run it on cpu machine.My 
componey has gpu k8s cluster,and I read the document 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/advanced/external_resources/

 

count you give me a demo?Including tf inference on gpu and train on gpu?

I use alink in some of my task, is there a demo for alink on gpu?


this is part of the answer:
https://issues.apache.org/jira/browse/FLINK-22205

Does it support rate-limiting in flink 1.12?

2021-04-12 Thread
When I run a sql job with blink planner in my cluster,the task is almost 
preemption the whole resources in the cluster,  and this is a bad effect to the 
stream task.As it is not necessary on speed,so is there any way to control the 
rate in my batch task?

 

this is the machine performance in running some operator:

union operation cannot support data type : map on flink 1.12

2021-03-29 Thread
this is the error message:


Flink SQL> (SELECT number_feature FROM map_string_string1) UNION (SELECT 
number_feature FROM map_string_string2);


2021-03-29 16:57:15,479 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to 
connect to metastore with URI thrift://localhost:9083
2021-03-29 16:57:15,483 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a 
connection to metastore, current connections: 2
2021-03-29 16:57:15,530 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to 
metastore.
2021-03-29 16:57:15,530 INFO  
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 
(auth:SIMPLE) retries=1 delay=1 lifetime=0
2021-03-29 16:57:15,580 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a 
connection to metastore, current connections: 1
2021-03-29 16:57:15,651 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to 
connect to metastore with URI thrift://localhost:9083
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a 
connection to metastore, current connections: 2
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to 
metastore.
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 
(auth:SIMPLE) retries=1 delay=1 lifetime=0
2021-03-29 16:57:15,698 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a 
connection to metastore, current connections: 1




[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Unsupported type: MAP






I meet a problem just like this: My flink version is 1.12.0,and I change the 
version to 1.12.3, the problem still exist.


I create two tables which have data type map, I want to union two tables to 
one, but when i run on sql-client, I met such a problem:(number_feature is a 
map data)


but when I run this sql,it has correct result:"(SELECT query FROM 
map_string_string1) UNION (SELECT query FROM map_string_string2)"
The reason why it cannot support map union is a problem?


this is my create table statements:
CREATE TABLE `map_string_string1`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'


CREATE TABLE `map_string_string2`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'

Re:image

2021-03-29 Thread
this is the contents of this image:


Flink SQL> (SELECT number_feature FROM map_string_string1) UNION (SELECT 
number_feature FROM map_string_string2);


2021-03-29 16:57:15,479 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to 
connect to metastore with URI thrift://localhost:9083
2021-03-29 16:57:15,483 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a 
connection to metastore, current connections: 2
2021-03-29 16:57:15,530 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to 
metastore.
2021-03-29 16:57:15,530 INFO  
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 
(auth:SIMPLE) retries=1 delay=1 lifetime=0
2021-03-29 16:57:15,580 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a 
connection to metastore, current connections: 1
2021-03-29 16:57:15,651 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to 
connect to metastore with URI thrift://localhost:9083
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a 
connection to metastore, current connections: 2
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to 
metastore.
2021-03-29 16:57:15,652 INFO  
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=zhangying480 
(auth:SIMPLE) retries=1 delay=1 lifetime=0
2021-03-29 16:57:15,698 INFO  
org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a 
connection to metastore, current connections: 1




[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Unsupported type: MAP






























At 2021-03-29 17:42:05, "张颖"  wrote:

hello, I meet a problem just like this: My flink version is 1.12.0,and I change 
the version to 1.12.3, the problem still exist.


I create two tables which have data type map, I want to union two tables to 
one, but when i run on sql-client, I met such a problem:(number_feature is a 
map data)






but when I run this sql,it has correct result:"(SELECT query FROM 
map_string_string1) UNION (SELECT query FROM map_string_string2)"
The reason why it cannot support map union is a problem?


this is my create table statements:
CREATE TABLE `map_string_string1`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'


CREATE TABLE `map_string_string2`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'






 

Re:image

2021-03-29 Thread
this is the image:


















At 2021-03-29 17:42:05, "张颖"  wrote:

hello, I meet a problem just like this: My flink version is 1.12.0,and I change 
the version to 1.12.3, the problem still exist.


I create two tables which have data type map, I want to union two tables to 
one, but when i run on sql-client, I met such a problem:(number_feature is a 
map data)






but when I run this sql,it has correct result:"(SELECT query FROM 
map_string_string1) UNION (SELECT query FROM map_string_string2)"
The reason why it cannot support map union is a problem?


this is my create table statements:
CREATE TABLE `map_string_string1`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'


CREATE TABLE `map_string_string2`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'






 

Re:image

2021-03-29 Thread
















At 2021-03-29 17:42:05, "张颖"  wrote:

hello, I meet a problem just like this: My flink version is 1.12.0,and I change 
the version to 1.12.3, the problem still exist.


I create two tables which have data type map, I want to union two tables to 
one, but when i run on sql-client, I met such a problem:(number_feature is a 
map data)






but when I run this sql,it has correct result:"(SELECT query FROM 
map_string_string1) UNION (SELECT query FROM map_string_string2)"
The reason why it cannot support map union is a problem?


this is my create table statements:
CREATE TABLE `map_string_string1`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'


CREATE TABLE `map_string_string2`(
  `query` string,
  `wid` string,
  `index` int,
  `page` string,
  `hc_cid1` string,
  `hc_cid2` string,
  `hc_cid3` string,
  `cid1` string,
  `cid2` string,
  `cid3` string,
  `ts` bigint,
  `number_feature` map)
PARTITIONED BY (
  `dt` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'






 

Re:退订

2021-03-29 Thread
什么意思?直接退订吗?

















在 2021-03-29 17:44:22,"纪军伟"  写道:
>退订


Re:退订

2021-03-29 Thread
什么意思?直接退订吗?

















在 2021-03-29 17:44:22,"纪军伟"  写道:
>退订