flink k8s operator chk config interval bug.inoperative

2024-03-14 文章 kcz
kcz
573693...@qq.com





请问1.18什么时候可以发布呢,想体验1.17jdk

2023-10-14 文章 kcz


??????flink-1.15.2 ConfigOption package-local method

2022-09-06 文章 kcz

ConfigOptions.key("pipeline.global-job-parameters").mapType().defaultValue(parameterTool.toMap())




----
??: 
   "user-zh"

<573693...@qq.com.INVALID;
:2022??9??6??(??) 8:15
??:"user-zh"

flink-1.15.2 ConfigOption package-local method

2022-09-06 文章 kcz
??pipeline.global-job-parameters
??ConfigOption ??new??

??????flink-1.14.4 ??????????????????

2022-09-05 文章 kcz
??




----
??: 
   "kcz"
<573693...@qq.com;
:2022??9??5??(??) 11:50
??:"user-zh"

flink-1.14.4 ??????????????????

2022-09-04 文章 kcz
select concat('1','2'),CURRENT_DATE();
No match found for function signature CURRENT_DATE()??

concat??

flink sink kafka exactly once????

2022-08-17 文章 kcz
flink-1.14.4kafka-2.4.0setTransactionalIdPrefixjob??IDchkIDSystem.currentTimeMillis()ID??KafkaSink

?????? flink hive???? owner????????

2022-07-18 文章 kcz
tks.




----
??: 
   "user-zh"

https://github.com/apache/flink/pull/16745

Best regards,
Yuxia

-  -
??: "kcz" <573693...@qq.com.INVALID
??: "user-zh" 

flink hive???? owner????????

2022-07-18 文章 kcz
flink-1.14.4
hive-3.1.0
??flinkhive??hivehive??owner??kerberosowner??
owner

flink-1.14.0 chk????kafka offset????

2021-12-29 文章 kcz
??
flink-1.14.0chk(500ms??)
source??kafka??- (1min??windowcount) - sink(mysql)
??kafka1??kafka
??sink??kafka??chk

??sink

flink-1.14.0 sql ??array ????

2021-10-13 文章 kcz
select??sumsumtypeMySQLMySQL
 (id,type,value)
SQL??
CREATE TABLE kafka_table (
   
   vin STRING,
   
   speed DOUBLE,
   
   brake DOUBLE,
   
   hard_to DOUBLE,
   
   distance DOUBLE,
   
   times TIMESTAMP(3),
   
   WATERMARK FOR times AS times - INTERVAL '5' SECOND
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);




select window_start, window_end,vin,array[row('brakes',sum(if(brake  
3.0451,1,0))),row('hard_tos',sum(if(hard_to  3.0451,1,0)))]
from TABLE(
  TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10' 
MINUTES)) group by window_start, window_end,vin;


??
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL 
window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, 
RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER EXPR$1) 
NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
converted type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL 
window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, 
RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT 
NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
rel:
LogicalProject(window_start=[$0], window_end=[$1], vin=[$2], 
EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) CHARACTER 
SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL, 
CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET 
"UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)])
 LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], agg#1=[SUM($4)])
  LogicalProject(window_start=[$6], window_end=[$7], vin=[$0], 
$f3=[IF(($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(($3, 
3.0451:DECIMAL(5, 4)), 1, 0)])
   LogicalTableFunctionScan(invocation=[TUMBLE($5, 
DESCRIPTOR($5), 60:INTERVAL MINUTE)], 
rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, DOUBLE 
hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
LogicalProject(vin=[$0], speed=[$1], brake=[$2], 
hard_to=[$3], distance=[$4], times=[$5])
 LogicalWatermarkAssigner(rowtime=[times], 
watermark=[-($5, 5000:INTERVAL SECOND)])
  
LogicalTableScan(table=[[default_catalog, default_database, kafka_table]])


at 
org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48)
at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87)
Disconnected from the target VM, address: '127.0.0.1:61710', transport: 'socket'


Process finished with exit code 1

??????flink-1.14 ???? kafkasource ????watermark????

2021-10-12 文章 kcz
??globalWindowtriggertimes.public
 class PathMonitorJob {
private static final String PATH = "path";
private static double THRESHOLD;
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
THRESHOLD = parameterTool.getDouble("threshold",1000d);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource

flink-1.14 ???? kafkasource ????watermark????

2021-10-11 文章 kcz
 
times??+20??StreamExecutionEnvironment
 env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource

flink-1.12.5 ????HIVDE DDL ????????comment??????hive????????????????comment

2021-09-26 文章 kcz
hive3.1.0
ddl:
create table test_hive(
id int comment 'test comment'
) PARTITIONED BY (dt STRING) STORED AS orc TBLPROPERTIES (
 'partition.time-extractor.kind'='custom',
 'partition.time-extractor.timestamp-pattern'='$dt',
 
'partition.time-extractor.class'='com.hycan.bigdata.utils.MyPartTimeExtractor',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 d',
 'sink.partition-commit.policy.kind'='metastore,success-file'
);

hive desc formatted test_hive comment

??????flink-1.12.0(1.13.2????????????) select datas[1].filed_1,datas[1].filed_2????????????????????

2021-09-25 文章 kcz
soryy ??




----
??: 
   "kcz"
<573693...@qq.com;
:2021??9??26??(??) 10:53
??:"user-zh"

flink-1.12.0(1.13.2????????????) select datas[1].filed_1,datas[1].filed_2????????????????????

2021-09-25 文章 kcz
??INDEX??INDEX++ 
??valuearray
CREATE TABLE KafkaTable (
 datas array

flink-1.12.0 ddl????watermark error??????1.13.2????????

2021-09-25 文章 kcz
SQL1.12.0??watermark??
CREATE TABLE KafkaTable (
 test array

?????? flink-1.12.0 ?????? ???? lag????

2021-09-21 文章 kcz
tks??




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19449

kcz <573693...@qq.com.invalid ??2021??9??22?? 11:41??

 
 behavior,next_bv 


 ??
 {
 nbsp;nbsp;"user_id":nbsp;1,
 nbsp;nbsp;"item_id":nbsp;1,
 nbsp;nbsp;"behavior":"pv1"
 }
 {
 nbsp;nbsp;"user_id":nbsp;1,
 nbsp;nbsp;"item_id":nbsp;1,
 nbsp;nbsp;"behavior":"pv2"
 }






 CREATE TABLE KafkaTable (
 nbsp; `user_id` BIGINT,
 nbsp; `item_id` BIGINT,
 nbsp; `behavior` STRING,
 nbsp; proctime as PROCTIME()
 ) WITH (
 nbsp; 'connector' = 'kafka',
 nbsp; 'topic' = 'user_behavior',
 nbsp; 'properties.bootstrap.servers' = '',
 nbsp; 'properties.group.id' = 'testGroup',
 nbsp; 'scan.startup.mode' = 'earliest-offset',
 nbsp; 'format' = 'json'
 );



 SELECT
 user_id,
 item_id,
 behavior,
 next_bvnbsp;
 FROM
 ( SELECT *, lag( behavior, 
1 ) over ( PARTITION BY user_id ORDER
 BY proctime ) AS next_bv FROM KafkaTable ) t;



-- 

Best,
Benchao Li

flink-1.12.0 ?????? ???? lag????

2021-09-21 文章 kcz

behavior,next_bv 


??
{
"user_id":1,
"item_id":1,
"behavior":"pv1"
}
{
"user_id":1,
"item_id":1,
"behavior":"pv2"
}






CREATE TABLE KafkaTable (
 `user_id` BIGINT,
 `item_id` BIGINT,
 `behavior` STRING,
 proctime as PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = '',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'json'
);



SELECT
user_id,
item_id,
behavior,
next_bv
FROM
( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER BY 
proctime ) AS next_bv FROM KafkaTable ) t;

回复:flink-1.13.1 ddl kafka消费JSON数据 (ObjectNode) jsonNode错误

2021-07-01 文章 kcz
大佬们,帮看一下,为什么那里会出现类型转换异常了。





-- 原始邮件 --
发件人: kcz <573693...@qq.com
发送时间: 2021年7月1日 22:49
收件人: user-zh 

flink-1.13.1 ddl kafka????JSON???? (ObjectNode) jsonNode????

2021-07-01 文章 kcz
:1.13.1 :
Caused by: java.lang.ClassCastException: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode 
cannot be cast to 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:344)
at 
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:376)
at 
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.convertToRowData(JsonRowDataDeserializationSchema.java:121)
at 
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:106)



DDL:
CREATE TABLE user_behavior (
  user_id string
) WITH (
'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
select * from user_behavior;

:
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv);
tableEnv.executeSql(sql).print();

 pom:


  

Re: flink-1.13.1 sql error

2021-06-20 文章 kcz
大佬们 帮看下这个是为什么提示那个错误



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink-1.13.1 sql error

2021-06-19 文章 kcz
sql??
CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json'
);


select * from user_behavior;



pom.xml??
flink.version=1.13.1


Re: apache flink

2021-01-04 文章 kcz
我的理解,flink是一个任务执行引擎,你需要的功能应该是任务调度器吧,比如airflow等。





-- Original --
From: Waldeinsamkeit. <1214316...@qq.com
Date: Tue,Jan 5,2021 11:13 AM
To: user-zh 

回复:flink-1.12 注册udf问题

2020-12-25 文章 kcz
是使用时候没有匹配参数个数问题,已经解决。





-- 原始邮件 --
发件人: kcz <573693...@qq.com
发送时间: 2020年12月26日 15:24
收件人: user-zh 

flink-1.12 注册udf问题

2020-12-25 文章 kcz
使用了
createTemporarySystemFunctiom来注册udf,使用时候no.match.found.for.function,目前是手机,不太方便粘贴更多信息。

回复:Flink 1.11里如何parse出未解析的执行计划

2020-10-21 文章 kcz
这里我当时也想要弄一下,不过失败了我。最后用了calcite来弄,你这块具体是如何去弄的?





-- 原始邮件 --
发件人: 马阳阳 

flink sql 窗口函数对分区的这个列进行过滤

2020-10-16 文章 kcz
因为列会有默认值,也有真实的,我想取到真实的那个列,这个功能如何实现一下。想到了窗口函数,发现不能进行过滤,还有一种骚操作是求max min。之后if来操作。

flink??????DDL????????????????????????????

2020-09-25 文章 kcz
??if ??

回复:flink1.11 sql问题

2020-08-25 文章 kcz
这个功能非常好的,因为第三方数据总是搞事情,动不动就加字段,改名字的。





-- 原始邮件 --
发件人: Benchao Li https://issues.apache.org/jira/browse/FLINK-18002

酷酷的浑蛋 

?????? flink 1.10.1 ???????? OutOfMemoryError: Metaspace

2020-08-25 文章 kcz
??ES5??pretty
 good??




----
??: 
   "user-zh"

https://www.yuque.com/codeleven/flink/dgygq2; 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:flink-1.10.1 想用 DDL 入 ES5.6

2020-08-17 文章 kcz
tks.收到





-- 原始邮件 --
发件人: Yangze Guo https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3fgt;
 开始
 比如你自己实现了Elasticsearch5DynamicSink 
<https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3fgt;
 一套后,再打一个 es5 的sql jar 就好了。

 祝好
 Leonard
 [1] https://github.com/apache/flink/pull/12184 
<https://github.com/apache/flink/pull/12184gt;


 gt; 在 2020年8月14日,10:14,kcz <573693...@qq.comgt; 写道:
 gt;
 gt; 查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。
 gt;
 gt;
 gt;
 gt; --
 gt; Sent from: http://apache-flink.147419.n8.nabble.com/

??????flink-1.10.1 ???? DDL ?? ES5.6

2020-08-17 文章 kcz
ES5??sql??ES5
 sinK??connect??
----
??: 
   "kcz"
<573693...@qq.com;
:2020??8??17??(??) 8:34
??:"user-zh"https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f;
 
Elasticsearch5DynamicSink 
<https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f;
  es5 ??sql jar 


Leonard 
[1] https://github.com/apache/flink/pull/12184 
<https://github.com/apache/flink/pull/12184;


 ?? 2020??8??1410:14??kcz <573693...@qq.com ??
 
 ??[1]??url?? es sql jar
 
 
 
 --
 Sent from: http://apache-flink.147419.n8.nabble.com/

回复:flink-1.10.1 想用 DDL 入 ES5.6

2020-08-16 文章 kcz
谢谢大佬 我先研究研究





-- 原始邮件 --
发件人: Leonard Xu https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f;
 开始
比如你自己实现了Elasticsearch5DynamicSink 
<https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f;
 一套后,再打一个 es5 的sql jar 就好了。

祝好
Leonard 
[1] https://github.com/apache/flink/pull/12184 
<https://github.com/apache/flink/pull/12184;


 在 2020年8月14日,10:14,kcz <573693...@qq.com 写道:
 
 查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。
 
 
 
 --
 Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-1.10.1 想用 DDL 入 ES5.6

2020-08-13 文章 kcz
查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:请教:时间属性字段传递问题,有办法解决吗?

2020-08-06 文章 kcz
可以再解释一下吗?还是没有看太懂是哪里出错,以及为什么那样就可以解决问题。





-- 原始邮件 --
发件人: Tianwang Li 

?????? flink-1.11 ????????

2020-08-05 文章 kcz
 




----
??: 
   "user-zh"



回复:flink-1.11 模拟背压

2020-08-03 文章 kcz
嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。





-- 原始邮件 --
发件人: shizk233 

flink-1.11 模拟背压

2020-08-03 文章 kcz
我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new MemoryStateBackend());
env.setParallelism(4);
Properties properties = getLocal();
properties.setProperty("group.id","test");
FlinkKafkaConsumer

?????? flink-1.11 hive-1.2.1 ddl ????????????

2020-07-29 文章 kcz
sorry,idea??log4j??process-time ?? 
process time??log??




----
??: 
   "user-zh"

http://connector.properties.group.id/;' 
= 'domain_testGroup',\n" +

 "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +

 "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +

 "\t'update-mode' = 'append',\n" +

 "\t'format.type' = 'json',\n" +

 "\t'format.derive-schema' = 'true'\n" +

 ")");


Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options
 


flink-1.11 hive-1.2.1 ddl ????????????

2020-07-29 文章 kcz
hive

package com.hive;

import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.time.Duration;

public class HiveTest {
private static final String path = "hdfs_path";
public static void main(String []args)  {
System.setProperty("HADOOP_USER_NAME", "work");
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setStateBackend(new FsStateBackend(path));
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env,tableEnvSettings);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofSeconds(20));

String name= "myhive";
String defaultDatabase = "situation";
String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
String version = "1.2.1";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");


tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
"\thost STRING,\n" +
"\turl STRING,\n" +
"\tpublic_date STRING\n" +
") WITH (\n" +
"\t'connector.type' = 'kafka',\n" +
"\t'connector.version' = 'universal',\n" +
"\t'connector.startup-mode' = 'latest-offset',\n" +
"\t'connector.topic' = 'sendMessage',\n" +
"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
"\t'connector.properties.zookeeper.connect' = 
'127.0.0.1:2181',\n" +
"\t'connector.properties.bootstrap.servers' = 
'127.0.0.1:9092',\n" +
"\t'update-mode' = 'append',\n" +
"\t'format.type' = 'json',\n" +
"\t'format.derive-schema' = 'true'\n" +
")");

tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");

String hiveSql = "\n" +
"  CREATE TABLE situation.fs_table (\n" +
" \n" +
"host STRING,\n" +
"url STRING,\n" +
"public_date STRING\n" +
"  \n" +
"  ) PARTITIONED BY (\n" +
"ts_date STRING,\n" +
"ts_hour STRING,\n" +
"ts_minute STRING\n" +
"  ) STORED AS PARQUET\n" +
"  TBLPROPERTIES (\n" +
"'sink.partition-commit.trigger' = 'process time',\n" +
"'sink.partition-commit.delay' = '1 min',\n" +
"'sink.partition-commit.policy.kind' = 
'metastore,success-file',\n" +
"'partition.time-extractor.timestamp-pattern' = '$ts_date 
$ts_hour:$ts_minute:00'\n" +
"  )\n" +
"  ";
tableEnv.executeSql(hiveSql);

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

tableEnv.executeSql("INSERT INTO  situation.fs_table SELECT host, 
url,public_date," +
" DATE_FORMAT(public_date,'-MM-dd') 
,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm')  FROM 
situation.source_table");



}
}

回复:flink row 类型

2020-07-27 文章 kcz
哇 这个方式很取巧了 好机智 我之前就是一直索引取值 学习一下





-- 原始邮件 --
发件人: Jark Wu 

回复:flink-1.11 ddl kafka-to-hive问题

2020-07-22 文章 kcz
谢谢大佬们,公众号有demo了,我去对比一下看看





-- 原始邮件 --
发件人: Jingsong Li https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
 <
 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect
 

  在 2020年7月21日,22:57,kcz <573693...@qq.com 写道:
 
  一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的
 
 
 
 
 
  -- 原始邮件 --
  发件人: JasonLee <17610775...@163.com 
<mailto:17610775...@163.comgt;
  发送时间: 2020年7月21日 20:39
  收件人: user-zh mailto:user-zh@flink.apache.org
 gt;
  主题: 回复:flink-1.11 ddl kafka-to-hive问题
 
 
 
  hi
  hive表是一直没有数据还是过一段时间就有数据了?
 
 
  | |
  JasonLee
  |
  |
  邮箱:17610775...@163.com
  |
 
  Signature is customized by Netease Mail Master
 
  在2020年07月21日 19:09,kcz 写道:
  hive-1.2.1
  chk 已经成功了(去chk目录查看了的确有chk数据,kafka也有数据),但是hive表没有数据,我是哪里缺少了什么吗?
  String hiveSql = "CREATEnbsp; TABLEnbsp; 
stream_tmp.fs_table (\n" +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; 
"nbsp; host STRING,\n" +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; 
"nbsp; url STRING," +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; 
"nbsp; public_date STRING" +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; ") 
partitioned by (public_date
 string) " +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "stored 
as PARQUET " +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; 
"TBLPROPERTIES (\n" +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; 
"nbsp;
 'sink.partition-commit.delay'='0 s',\n" +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; 
"nbsp;
 'sink.partition-commit.trigger'='partition-time',\n" +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; 
"nbsp;
 'sink.partition-commit.policy.kind'='metastore,success-file'" +
  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; ")";
  tableEnv.executeSql(hiveSql);
 
 
  tableEnv.executeSql("INSERT INTOnbsp; stream_tmp.fs_table SELECT 
host,
 url, DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");



-- 
Best, Jingsong Lee

回复:flink-1.11 ddl kafka-to-hive问题

2020-07-21 文章 kcz
一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的





-- 原始邮件 --
发件人: JasonLee <17610775...@163.com
发送时间: 2020年7月21日 20:39
收件人: user-zh 

flink-1.11 ddl kafka-to-hive????

2020-07-21 文章 kcz
hive-1.2.1
chk 
??chkchk??kafkahive??
String hiveSql = "CREATE  TABLE  stream_tmp.fs_table (\n" +
"  host STRING,\n" +
"  url STRING," +
"  public_date STRING" +
") partitioned by (public_date string) " +
"stored as PARQUET " +
"TBLPROPERTIES (\n" +
"  'sink.partition-commit.delay'='0 s',\n" +
"  'sink.partition-commit.trigger'='partition-time',\n" +
"  'sink.partition-commit.policy.kind'='metastore,success-file'" +
")";
tableEnv.executeSql(hiveSql);


tableEnv.executeSql("INSERT INTO  stream_tmp.fs_table SELECT host, url, 
DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");

flink-1.11 ????hive-1.2.1 DDL????

2020-07-17 文章 kcz
idea 
??hivepom
hive-exec flink-connector-hive_2.11
:
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE);
// 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setStateBackend(new FsStateBackend(path));

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String name= "myhive";
String defaultDatabase = "situation";
String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
String version = "1.2.1";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp");
tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.source_table");


??

Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing 
class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.table.planner.delegation.PlannerBase.

?????? flink-1.11 ddl ????json ??????????hdfs????

2020-07-17 文章 kcz
tks??30m,??m??






----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#rolling-policy

Best,
Jingsong

On Fri, Jul 17, 2020 at 4:25 PM kcz <573693...@qq.com wrote:

 

 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#full-example
 
??parquet??json??chk??in-progress
 parquet??success



-- 
Best, Jingsong Lee

flink-1.11 ddl ????json ??????????hdfs????

2020-07-17 文章 kcz

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#full-example
??parquet??json??chk??in-progress
parquet??success

?????? flink-1.11 DDL ????hdfs???? Cannot instantiate user function

2020-07-17 文章 kcz
??parquet??error??
java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/ParquetWriter$Builder
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.createBulkWriterFactory(ParquetFileSystemFormatFactory.java:110)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:274)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.consumeDataStream(FileSystemTableSink.java:154)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:114)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.HdfsDDL.main(HdfsDDL.java:71)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at 
org.apache.flink.client.cli.CliFrontend$$Lambda$67/388104475.call(Unknown 
Source)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$68/1470966439.run(Unknown
 Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1659)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 

??????flink-1.11 DDL ????hdfs???? Cannot instantiate user function

2020-07-17 文章 kcz
??bug??
classloader.resolve-order: parent-first
??bug??parquet


----
??: 
   "kcz"
<573693...@qq.com;
:2020??7??17??(??) 1:32
??:"user-zh"

flink-1.11 DDL ????hdfs???? Cannot instantiate user function

2020-07-16 文章 kcz
standalone
lib jar??
flink-connector-hive_2.11-1.11.0.jar
flink-json-1.11.0.jar
flink-sql-connector-kafka_2.12-1.11.0.jar 
log4j-api-2.12.1.jar
flink-csv-1.11.0.jar
flink-parquet_2.11-1.11.0.jar   
 flink-table_2.11-1.11.0.jar  
  log4j-core-2.12.1.jar
flink-dist_2.11-1.11.0.jar   
  flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar 
flink-table-blink_2.11-1.11.0.jar 
log4j-slf4j-impl-2.12.1.jar
flink-hadoop-compatibility_2.11-1.11.0.jar 
flink-shaded-zookeeper-3.4.14.jar  
log4j-1.2-api-2.12.1.jar





??idea
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
env.setStateBackend(new FsStateBackend(path));

tableEnv.executeSql("CREATE TABLE source_table (\n" +
"\thost STRING,\n" +
"\turl STRING,\n" +
"\tpublic_date STRING\n" +
") WITH (\n" +
"\t'connector.type' = 'kafka',\n" +
"\t'connector.version' = 'universal',\n" +
"\t'connector.startup-mode' = 'latest-offset',\n" +
"\t'connector.topic' = 'test_flink_1.11',\n" +
"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
"\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
"\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
"\t'update-mode' = 'append',\n" +
"\t'format.type' = 'json',\n" +
"\t'format.derive-schema' = 'true'\n" +
")");

tableEnv.executeSql("CREATE TABLE fs_table (\n" +
"  host STRING,\n" +
"  url STRING,\n" +
"  public_date STRING\n" +
") PARTITIONED BY (public_date) WITH (\n" +
"  'connector'='filesystem',\n" +
"  'path'='path',\n" +
"  'format'='json',\n" +
"  'sink.partition-commit.delay'='0s',\n" +
"  'sink.partition-commit.policy.kind'='success-file'\n" +
")");

tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url, 
DATE_FORMAT(public_date, '-MM-dd') FROM source_table");
TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
result.print();

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.

回复:flink-1.11 DDL 设置chk目录问题

2020-07-14 文章 kcz
谢谢 我一直用的是 streamEnv去设置config 今天看到table也可以,如果我用stream去设置 也是可以的吧





-- 原始邮件 --
发件人: Leonard Xu 

flink-1.11 DDL ????chk????????

2020-07-14 文章 kcz
??streameEnv.setStateBackend(new FsStateBackend(checkpointPath));
DDL
tableEnv.getConfig().getConfiguration().set(
ExecutionCheckpointingOptions.CHECKPOINTING_MODE, 
CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(10));

?????? ????????????????????????????

2020-07-06 文章 kcz
windowflink??




----
??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html#the-keyedprocessfunction
Best,
Congxian


JasonLee <17610775...@163.com ??2020??7??4?? 8:29??

 


 | |
 JasonLee
 |
 |
 ??17610775...@163.com
 |

 Signature is customized by Netease Mail Master

 ??2020??07??03?? 14:02??18579099...@163.com ??

 
??ProcessWindowFunctionvalueState

 
1.??ProcessWindowFunction??process??1??trigger

 
process??valueState



 18579099...@163.com


回复:Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 文章 kcz
设置一个窗口时间,如果有需要取最新的,可以再做一下处理。





-- 原始邮件 --
发件人: admin <17626017...@163.com
发送时间: 2020年7月3日 18:01
收件人: user-zh 

?????? flink sql if ????????????

2020-06-30 文章 kcz
tks




----
??:"Benchao Li"

flink sql if ????????????

2020-06-30 文章 kcz
flink-1.10.1 blink_planner
if 
Cannot apply 'IF' to arguments of type 'IF(

??????flink open ???? transient??????????

2020-06-23 文章 kcz
??state




----
??:""<13162790...@163.com;
:2020??6??24??(??) 1:36
??:"user-zh"

flink open ???? transient??????????

2020-06-23 文章 kcz
??open??mysql??client 
??state??transient??

?????? flink1.11 ??????(???? DDL ??????(???? Table ????))

2020-06-16 文章 kcz
??




----
??:""https://developer.aliyun.com/live/2894?accounttraceid=07deb589f50c4c1abbcbed103e534316qnxq
04:17:00??

Kurt Young https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
 
 
  Best,
  Yichao Yang
 
 
 
 
  --nbsp;nbsp;--
  ??:nbsp;"Kurt Young"

?????? flink1.11 ??????(???? DDL ??????(???? Table ????))

2020-06-15 文章 kcz
tks




----
??:"Kurt Young"

flink1.11 ??????(???? DDL ??????(???? Table ????))

2020-06-15 文章 kcz
 Table 

回复:kafka相关问题

2020-06-10 文章 kcz
你这个表达,实时kafka的一条记录,你要最新的那个是吧,你最新的判断标准是什么?根据什么特性来,表达清楚一点哇。





-- 原始邮件 --
发件人: 小学生 <201782...@qq.com
发送时间: 2020年6月10日 18:15
收件人: user-zh 

?????? ????FlinkSQL????operatoer??????savepoint??????????????

2020-06-09 文章 kcz
tks




----
??:"Yichao Yang"<1048262...@qq.com;
:2020??6??10??(??) 11:32
??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
amp;gt;
amp;gt; 
amp;gt;
amp;gt; ?S 

?????? ????FlinkSQL????operatoer??????savepoint??????????????

2020-06-09 文章 kcz
sql 
operatorID??ID




----
??:"??"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

 

 ?S 

??????????????????????????????????

2020-06-09 文章 kcz
sorry??




----
??:"1048262223"<1048262...@qq.com;
:2020??6??9??(??) 5:07
??:"user-zh"

????????????????????????????

2020-06-09 文章 kcz
join??open
??

?????? Flink SQL UDF ????????

2020-06-09 文章 kcz
map ??tks??




----
??:"1048262223"<1048262...@qq.com;
:2020??6??9??(??) 4:51
??:"user-zh"

?????? Flink SQL UDF ????????

2020-06-09 文章 kcz
udfudf




----
??:"Benchao Li"

回复:flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗

2020-06-04 文章 kcz
我大概get到你要说的需求,select那些其实是明细数据?但是没有跟聚合的数据拆开,所以才出现这种情况吧?





-- 原始邮件 --
发件人: Leonard Xu https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
 


回复:flink-1.10 读取hdfs目录下面所有文件,无输出

2020-06-02 文章 kcz
谢谢大佬,我看看





-- 原始邮件 --
发件人: Sun.Zhu <17626017...@163.com
发送时间: 2020年6月2日 23:57
收件人: user-zh@flink.apache.org https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#part-file-lifecycle

[2]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#rolling-policy


Best
Sun.Zhu
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月2日 19:20,kcz<573693...@qq.com 写道:
代码如下:
String path = "hdfs://HACluster/user/flink/test-1/2020-05-29--15/";
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new Path(path));
fileInputFormat.setNestedFileEnumeration(true);
env.readFile(fileInputFormat, path).print();
env.execute();hdfs数据目录如下:/user/flink/test-1/2020-05-29--15/.part-0-0.inprogress.6c12fe72-5602-4458-b29f-c8c8b4a7b73b(有数据)/user/flink/test-1/2020-05-29--15/.part-1-0.inprogress.34b1d5ff-cf0d-4209-b409-21920b12327d(有数据)问题如下:flink无法获取到数据输出

flink-1.10 ????hdfs????????????????????????

2020-06-02 文章 kcz
??
String path = "hdfs://HACluster/user/flink/test-1/2020-05-29--15/";
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new Path(path));
fileInputFormat.setNestedFileEnumeration(true);
env.readFile(fileInputFormat, path).print();
env.execute();hdfs??/user/flink/test-1/2020-05-29--15/.part-0-0.inprogress.6c12fe72-5602-4458-b29f-c8c8b4a7b73b(??)/user/flink/test-1/2020-05-29--15/.part-1-0.inprogress.34b1d5ff-cf0d-4209-b409-21920b12327dflink??

?????? flink-1.10.0 hive-1.2.1 No operators defined in streaming topology

2020-05-29 文章 kcz
?? ?? 




----
??:"Benchao Li"