flink k8s operator chk config interval bug.inoperative
kcz 573693...@qq.com
请问1.18什么时候可以发布呢,想体验1.17jdk
??????flink-1.15.2 ConfigOption package-local method
ConfigOptions.key("pipeline.global-job-parameters").mapType().defaultValue(parameterTool.toMap()) ---- ??: "user-zh" <573693...@qq.com.INVALID; :2022??9??6??(??) 8:15 ??:"user-zh"
flink-1.15.2 ConfigOption package-local method
??pipeline.global-job-parameters ??ConfigOption ??new??
??????flink-1.14.4 ??????????????????
?? ---- ??: "kcz" <573693...@qq.com; :2022??9??5??(??) 11:50 ??:"user-zh"
flink-1.14.4 ??????????????????
select concat('1','2'),CURRENT_DATE(); No match found for function signature CURRENT_DATE()?? concat??
flink sink kafka exactly once????
flink-1.14.4kafka-2.4.0setTransactionalIdPrefixjob??IDchkIDSystem.currentTimeMillis()ID??KafkaSink
?????? flink hive???? owner????????
tks. ---- ??: "user-zh" https://github.com/apache/flink/pull/16745 Best regards, Yuxia - - ??: "kcz" <573693...@qq.com.INVALID ??: "user-zh"
flink hive???? owner????????
flink-1.14.4 hive-3.1.0 ??flinkhive??hivehive??owner??kerberosowner?? owner
flink-1.14.0 chk????kafka offset????
?? flink-1.14.0chk(500ms??) source??kafka??- (1min??windowcount) - sink(mysql) ??kafka1??kafka ??sink??kafka??chk ??sink
flink-1.14.0 sql ??array ????
select??sumsumtypeMySQLMySQL (id,type,value) SQL?? CREATE TABLE kafka_table ( vin STRING, speed DOUBLE, brake DOUBLE, hard_to DOUBLE, distance DOUBLE, times TIMESTAMP(3), WATERMARK FOR times AS times - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); select window_start, window_end,vin,array[row('brakes',sum(if(brake 3.0451,1,0))),row('hard_tos',sum(if(hard_to 3.0451,1,0)))] from TABLE( TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10' MINUTES)) group by window_start, window_end,vin; ?? Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL converted type: RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL rel: LogicalProject(window_start=[$0], window_end=[$1], vin=[$2], EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL, CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)]) LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], agg#1=[SUM($4)]) LogicalProject(window_start=[$6], window_end=[$7], vin=[$0], $f3=[IF(($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(($3, 3.0451:DECIMAL(5, 4)), 1, 0)]) LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 60:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, DOUBLE hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) LogicalProject(vin=[$0], speed=[$1], brake=[$2], hard_to=[$3], distance=[$4], times=[$5]) LogicalWatermarkAssigner(rowtime=[times], watermark=[-($5, 5000:INTERVAL SECOND)]) LogicalTableScan(table=[[default_catalog, default_database, kafka_table]]) at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48) at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87) Disconnected from the target VM, address: '127.0.0.1:61710', transport: 'socket' Process finished with exit code 1
??????flink-1.14 ???? kafkasource ????watermark????
??globalWindowtriggertimes.public class PathMonitorJob { private static final String PATH = "path"; private static double THRESHOLD; public static void main(String[] args) throws Exception { ParameterTool parameterTool = ParameterTool.fromArgs(args); THRESHOLD = parameterTool.getDouble("threshold",1000d); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource
flink-1.14 ???? kafkasource ????watermark????
times??+20??StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource
flink-1.12.5 ????HIVDE DDL ????????comment??????hive????????????????comment
hive3.1.0 ddl: create table test_hive( id int comment 'test comment' ) PARTITIONED BY (dt STRING) STORED AS orc TBLPROPERTIES ( 'partition.time-extractor.kind'='custom', 'partition.time-extractor.timestamp-pattern'='$dt', 'partition.time-extractor.class'='com.hycan.bigdata.utils.MyPartTimeExtractor', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 d', 'sink.partition-commit.policy.kind'='metastore,success-file' ); hive desc formatted test_hive comment
??????flink-1.12.0(1.13.2????????????) select datas[1].filed_1,datas[1].filed_2????????????????????
soryy ?? ---- ??: "kcz" <573693...@qq.com; :2021??9??26??(??) 10:53 ??:"user-zh"
flink-1.12.0(1.13.2????????????) select datas[1].filed_1,datas[1].filed_2????????????????????
??INDEX??INDEX++ ??valuearray CREATE TABLE KafkaTable ( datas array
flink-1.12.0 ddl????watermark error??????1.13.2????????
SQL1.12.0??watermark?? CREATE TABLE KafkaTable ( test array
?????? flink-1.12.0 ?????? ???? lag????
tks?? ---- ??: "user-zh" https://issues.apache.org/jira/browse/FLINK-19449 kcz <573693...@qq.com.invalid ??2021??9??22?? 11:41?? behavior,next_bv ?? { nbsp;nbsp;"user_id":nbsp;1, nbsp;nbsp;"item_id":nbsp;1, nbsp;nbsp;"behavior":"pv1" } { nbsp;nbsp;"user_id":nbsp;1, nbsp;nbsp;"item_id":nbsp;1, nbsp;nbsp;"behavior":"pv2" } CREATE TABLE KafkaTable ( nbsp; `user_id` BIGINT, nbsp; `item_id` BIGINT, nbsp; `behavior` STRING, nbsp; proctime as PROCTIME() ) WITH ( nbsp; 'connector' = 'kafka', nbsp; 'topic' = 'user_behavior', nbsp; 'properties.bootstrap.servers' = '', nbsp; 'properties.group.id' = 'testGroup', nbsp; 'scan.startup.mode' = 'earliest-offset', nbsp; 'format' = 'json' ); SELECT user_id, item_id, behavior, next_bvnbsp; FROM ( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER BY proctime ) AS next_bv FROM KafkaTable ) t; -- Best, Benchao Li
flink-1.12.0 ?????? ???? lag????
behavior,next_bv ?? { "user_id":1, "item_id":1, "behavior":"pv1" } { "user_id":1, "item_id":1, "behavior":"pv2" } CREATE TABLE KafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = '', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); SELECT user_id, item_id, behavior, next_bv FROM ( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER BY proctime ) AS next_bv FROM KafkaTable ) t;
回复:flink-1.13.1 ddl kafka消费JSON数据 (ObjectNode) jsonNode错误
大佬们,帮看一下,为什么那里会出现类型转换异常了。 -- 原始邮件 -- 发件人: kcz <573693...@qq.com 发送时间: 2021年7月1日 22:49 收件人: user-zh
flink-1.13.1 ddl kafka????JSON???? (ObjectNode) jsonNode????
:1.13.1 : Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode at org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:344) at org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:376) at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.convertToRowData(JsonRowDataDeserializationSchema.java:121) at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:106) DDL: CREATE TABLE user_behavior ( user_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); select * from user_behavior; : StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironment(new Configuration()); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); tableEnv.executeSql(sql).print(); pom:
Re: flink-1.13.1 sql error
大佬们 帮看下这个是为什么提示那个错误 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink-1.13.1 sql error
sql?? CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json' ); select * from user_behavior; pom.xml?? flink.version=1.13.1
Re: apache flink
我的理解,flink是一个任务执行引擎,你需要的功能应该是任务调度器吧,比如airflow等。 -- Original -- From: Waldeinsamkeit. <1214316...@qq.com Date: Tue,Jan 5,2021 11:13 AM To: user-zh
回复:flink-1.12 注册udf问题
是使用时候没有匹配参数个数问题,已经解决。 -- 原始邮件 -- 发件人: kcz <573693...@qq.com 发送时间: 2020年12月26日 15:24 收件人: user-zh
flink-1.12 注册udf问题
使用了 createTemporarySystemFunctiom来注册udf,使用时候no.match.found.for.function,目前是手机,不太方便粘贴更多信息。
回复:Flink 1.11里如何parse出未解析的执行计划
这里我当时也想要弄一下,不过失败了我。最后用了calcite来弄,你这块具体是如何去弄的? -- 原始邮件 -- 发件人: 马阳阳
flink sql 窗口函数对分区的这个列进行过滤
因为列会有默认值,也有真实的,我想取到真实的那个列,这个功能如何实现一下。想到了窗口函数,发现不能进行过滤,还有一种骚操作是求max min。之后if来操作。
flink??????DDL????????????????????????????
??if ??
回复:flink1.11 sql问题
这个功能非常好的,因为第三方数据总是搞事情,动不动就加字段,改名字的。 -- 原始邮件 -- 发件人: Benchao Li https://issues.apache.org/jira/browse/FLINK-18002 酷酷的浑蛋
?????? flink 1.10.1 ???????? OutOfMemoryError: Metaspace
??ES5??pretty good?? ---- ??: "user-zh" https://www.yuque.com/codeleven/flink/dgygq2; -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:flink-1.10.1 想用 DDL 入 ES5.6
tks.收到 -- 原始邮件 -- 发件人: Yangze Guo https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3fgt; 开始 比如你自己实现了Elasticsearch5DynamicSink <https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3fgt; 一套后,再打一个 es5 的sql jar 就好了。 祝好 Leonard [1] https://github.com/apache/flink/pull/12184 <https://github.com/apache/flink/pull/12184gt; gt; 在 2020年8月14日,10:14,kcz <573693...@qq.comgt; 写道: gt; gt; 查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。 gt; gt; gt; gt; -- gt; Sent from: http://apache-flink.147419.n8.nabble.com/
??????flink-1.10.1 ???? DDL ?? ES5.6
ES5??sql??ES5 sinK??connect?? ---- ??: "kcz" <573693...@qq.com; :2020??8??17??(??) 8:34 ??:"user-zh"https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f; Elasticsearch5DynamicSink <https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f; es5 ??sql jar Leonard [1] https://github.com/apache/flink/pull/12184 <https://github.com/apache/flink/pull/12184; ?? 2020??8??1410:14??kcz <573693...@qq.com ?? ??[1]??url?? es sql jar -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:flink-1.10.1 想用 DDL 入 ES5.6
谢谢大佬 我先研究研究 -- 原始邮件 -- 发件人: Leonard Xu https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f; 开始 比如你自己实现了Elasticsearch5DynamicSink <https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f; 一套后,再打一个 es5 的sql jar 就好了。 祝好 Leonard [1] https://github.com/apache/flink/pull/12184 <https://github.com/apache/flink/pull/12184; 在 2020年8月14日,10:14,kcz <573693...@qq.com 写道: 查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-1.10.1 想用 DDL 入 ES5.6
查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:请教:时间属性字段传递问题,有办法解决吗?
可以再解释一下吗?还是没有看太懂是哪里出错,以及为什么那样就可以解决问题。 -- 原始邮件 -- 发件人: Tianwang Li
?????? flink-1.11 ????????
---- ??: "user-zh"
回复:flink-1.11 模拟背压
嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。 -- 原始邮件 -- 发件人: shizk233
flink-1.11 模拟背压
我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new MemoryStateBackend()); env.setParallelism(4); Properties properties = getLocal(); properties.setProperty("group.id","test"); FlinkKafkaConsumer
?????? flink-1.11 hive-1.2.1 ddl ????????????
sorry,idea??log4j??process-time ?? process time??log?? ---- ??: "user-zh" http://connector.properties.group.id/;' = 'domain_testGroup',\n" + "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + "\t'update-mode' = 'append',\n" + "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")"); Best Leonard [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options
flink-1.11 hive-1.2.1 ddl ????????????
hive package com.hive; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import java.time.Duration; public class HiveTest { private static final String path = "hdfs_path"; public static void main(String []args) { System.setProperty("HADOOP_USER_NAME", "work"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.setStateBackend(new FsStateBackend(path)); EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,tableEnvSettings); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20)); String name= "myhive"; String defaultDatabase = "situation"; String hiveConfDir = "/load/data/hive/hive-conf"; // a local path String version = "1.2.1"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog("myhive", hive); // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive"); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation"); tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table"); tableEnv.executeSql("CREATE TABLE situation.source_table (\n" + "\thost STRING,\n" + "\turl STRING,\n" + "\tpublic_date STRING\n" + ") WITH (\n" + "\t'connector.type' = 'kafka',\n" + "\t'connector.version' = 'universal',\n" + "\t'connector.startup-mode' = 'latest-offset',\n" + "\t'connector.topic' = 'sendMessage',\n" + "\t'connector.properties.group.id' = 'domain_testGroup',\n" + "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + "\t'update-mode' = 'append',\n" + "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table"); String hiveSql = "\n" + " CREATE TABLE situation.fs_table (\n" + " \n" + "host STRING,\n" + "url STRING,\n" + "public_date STRING\n" + " \n" + " ) PARTITIONED BY (\n" + "ts_date STRING,\n" + "ts_hour STRING,\n" + "ts_minute STRING\n" + " ) STORED AS PARQUET\n" + " TBLPROPERTIES (\n" + "'sink.partition-commit.trigger' = 'process time',\n" + "'sink.partition-commit.delay' = '1 min',\n" + "'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" + "'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'\n" + " )\n" + " "; tableEnv.executeSql(hiveSql); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tableEnv.executeSql("INSERT INTO situation.fs_table SELECT host, url,public_date," + " DATE_FORMAT(public_date,'-MM-dd') ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm') FROM situation.source_table"); } }
回复:flink row 类型
哇 这个方式很取巧了 好机智 我之前就是一直索引取值 学习一下 -- 原始邮件 -- 发件人: Jark Wu
回复:flink-1.11 ddl kafka-to-hive问题
谢谢大佬们,公众号有demo了,我去对比一下看看 -- 原始邮件 -- 发件人: Jingsong Li https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect < https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#use-hive-dialect 在 2020年7月21日,22:57,kcz <573693...@qq.com 写道: 一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的 -- 原始邮件 -- 发件人: JasonLee <17610775...@163.com <mailto:17610775...@163.comgt; 发送时间: 2020年7月21日 20:39 收件人: user-zh mailto:user-zh@flink.apache.org gt; 主题: 回复:flink-1.11 ddl kafka-to-hive问题 hi hive表是一直没有数据还是过一段时间就有数据了? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月21日 19:09,kcz 写道: hive-1.2.1 chk 已经成功了(去chk目录查看了的确有chk数据,kafka也有数据),但是hive表没有数据,我是哪里缺少了什么吗? String hiveSql = "CREATEnbsp; TABLEnbsp; stream_tmp.fs_table (\n" + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "nbsp; host STRING,\n" + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "nbsp; url STRING," + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "nbsp; public_date STRING" + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; ") partitioned by (public_date string) " + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "stored as PARQUET " + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "TBLPROPERTIES (\n" + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "nbsp; 'sink.partition-commit.delay'='0 s',\n" + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "nbsp; 'sink.partition-commit.trigger'='partition-time',\n" + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; "nbsp; 'sink.partition-commit.policy.kind'='metastore,success-file'" + nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; ")"; tableEnv.executeSql(hiveSql); tableEnv.executeSql("INSERT INTOnbsp; stream_tmp.fs_table SELECT host, url, DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table"); -- Best, Jingsong Lee
回复:flink-1.11 ddl kafka-to-hive问题
一直都木有数据 我也不知道哪里不太对 hive有这个表了已经。我测试写ddl hdfs 是OK的 -- 原始邮件 -- 发件人: JasonLee <17610775...@163.com 发送时间: 2020年7月21日 20:39 收件人: user-zh
flink-1.11 ddl kafka-to-hive????
hive-1.2.1 chk ??chkchk??kafkahive?? String hiveSql = "CREATE TABLE stream_tmp.fs_table (\n" + " host STRING,\n" + " url STRING," + " public_date STRING" + ") partitioned by (public_date string) " + "stored as PARQUET " + "TBLPROPERTIES (\n" + " 'sink.partition-commit.delay'='0 s',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")"; tableEnv.executeSql(hiveSql); tableEnv.executeSql("INSERT INTO stream_tmp.fs_table SELECT host, url, DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");
flink-1.11 ????hive-1.2.1 DDL????
idea ??hivepom hive-exec flink-connector-hive_2.11 : StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.setStateBackend(new FsStateBackend(path)); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String name= "myhive"; String defaultDatabase = "situation"; String hiveConfDir = "/load/data/hive/hive-conf"; // a local path String version = "1.2.1"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog("myhive", hive); // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive"); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp"); tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.source_table"); ?? Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.table.planner.delegation.PlannerBase.
?????? flink-1.11 ddl ????json ??????????hdfs????
tks??30m,??m?? ---- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#rolling-policy Best, Jingsong On Fri, Jul 17, 2020 at 4:25 PM kcz <573693...@qq.com wrote: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#full-example ??parquet??json??chk??in-progress parquet??success -- Best, Jingsong Lee
flink-1.11 ddl ????json ??????????hdfs????
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#full-example ??parquet??json??chk??in-progress parquet??success
?????? flink-1.11 DDL ????hdfs???? Cannot instantiate user function
??parquet??error?? java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/ParquetWriter$Builder at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.createBulkWriterFactory(ParquetFileSystemFormatFactory.java:110) at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:274) at org.apache.flink.table.filesystem.FileSystemTableSink.consumeDataStream(FileSystemTableSink.java:154) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.HdfsDDL.main(HdfsDDL.java:71) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.client.cli.CliFrontend$$Lambda$67/388104475.call(Unknown Source) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$68/1470966439.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1659) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at
??????flink-1.11 DDL ????hdfs???? Cannot instantiate user function
??bug?? classloader.resolve-order: parent-first ??bug??parquet ---- ??: "kcz" <573693...@qq.com; :2020??7??17??(??) 1:32 ??:"user-zh"
flink-1.11 DDL ????hdfs???? Cannot instantiate user function
standalone lib jar?? flink-connector-hive_2.11-1.11.0.jar flink-json-1.11.0.jar flink-sql-connector-kafka_2.12-1.11.0.jar log4j-api-2.12.1.jar flink-csv-1.11.0.jar flink-parquet_2.11-1.11.0.jar flink-table_2.11-1.11.0.jar log4j-core-2.12.1.jar flink-dist_2.11-1.11.0.jar flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar flink-table-blink_2.11-1.11.0.jar log4j-slf4j-impl-2.12.1.jar flink-hadoop-compatibility_2.11-1.11.0.jar flink-shaded-zookeeper-3.4.14.jar log4j-1.2-api-2.12.1.jar ??idea StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.setParallelism(1); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);; env.setStateBackend(new FsStateBackend(path)); tableEnv.executeSql("CREATE TABLE source_table (\n" + "\thost STRING,\n" + "\turl STRING,\n" + "\tpublic_date STRING\n" + ") WITH (\n" + "\t'connector.type' = 'kafka',\n" + "\t'connector.version' = 'universal',\n" + "\t'connector.startup-mode' = 'latest-offset',\n" + "\t'connector.topic' = 'test_flink_1.11',\n" + "\t'connector.properties.group.id' = 'domain_testGroup',\n" + "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + "\t'update-mode' = 'append',\n" + "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")"); tableEnv.executeSql("CREATE TABLE fs_table (\n" + " host STRING,\n" + " url STRING,\n" + " public_date STRING\n" + ") PARTITIONED BY (public_date) WITH (\n" + " 'connector'='filesystem',\n" + " 'path'='path',\n" + " 'format'='json',\n" + " 'sink.partition-commit.delay'='0s',\n" + " 'sink.partition-commit.policy.kind'='success-file'\n" + ")"); tableEnv.executeSql("INSERT INTO fs_table SELECT host, url, DATE_FORMAT(public_date, '-MM-dd') FROM source_table"); TableResult result = tableEnv.executeSql("SELECT * FROM fs_table "); result.print(); org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) at org.apache.flink.streaming.runtime.tasks.OperatorChain.
回复:flink-1.11 DDL 设置chk目录问题
谢谢 我一直用的是 streamEnv去设置config 今天看到table也可以,如果我用stream去设置 也是可以的吧 -- 原始邮件 -- 发件人: Leonard Xu
flink-1.11 DDL ????chk????????
??streameEnv.setStateBackend(new FsStateBackend(checkpointPath)); DDL tableEnv.getConfig().getConfiguration().set( ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
?????? ????????????????????????????
windowflink?? ---- ??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html#the-keyedprocessfunction Best, Congxian JasonLee <17610775...@163.com ??2020??7??4?? 8:29?? | | JasonLee | | ??17610775...@163.com | Signature is customized by Netease Mail Master ??2020??07??03?? 14:02??18579099...@163.com ?? ??ProcessWindowFunctionvalueState 1.??ProcessWindowFunction??process??1??trigger process??valueState 18579099...@163.com
回复:Flink sql 主动使数据延时一段时间有什么方案
设置一个窗口时间,如果有需要取最新的,可以再做一下处理。 -- 原始邮件 -- 发件人: admin <17626017...@163.com 发送时间: 2020年7月3日 18:01 收件人: user-zh
?????? flink sql if ????????????
tks ---- ??:"Benchao Li"
flink sql if ????????????
flink-1.10.1 blink_planner if Cannot apply 'IF' to arguments of type 'IF(
??????flink open ???? transient??????????
??state ---- ??:""<13162790...@163.com; :2020??6??24??(??) 1:36 ??:"user-zh"
flink open ???? transient??????????
??open??mysql??client ??state??transient??
?????? flink1.11 ??????(???? DDL ??????(???? Table ????))
?? ---- ??:""https://developer.aliyun.com/live/2894?accounttraceid=07deb589f50c4c1abbcbed103e534316qnxq 04:17:00?? Kurt Young https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table Best, Yichao Yang --nbsp;nbsp;-- ??:nbsp;"Kurt Young"
?????? flink1.11 ??????(???? DDL ??????(???? Table ????))
tks ---- ??:"Kurt Young"
flink1.11 ??????(???? DDL ??????(???? Table ????))
Table
回复:kafka相关问题
你这个表达,实时kafka的一条记录,你要最新的那个是吧,你最新的判断标准是什么?根据什么特性来,表达清楚一点哇。 -- 原始邮件 -- 发件人: 小学生 <201782...@qq.com 发送时间: 2020年6月10日 18:15 收件人: user-zh
?????? ????FlinkSQL????operatoer??????savepoint??????????????
tks ---- ??:"Yichao Yang"<1048262...@qq.com; :2020??6??10??(??) 11:32 ??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html amp;gt; amp;gt; amp;gt; amp;gt; ?S
?????? ????FlinkSQL????operatoer??????savepoint??????????????
sql operatorID??ID ---- ??:"??"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html ?S
??????????????????????????????????
sorry?? ---- ??:"1048262223"<1048262...@qq.com; :2020??6??9??(??) 5:07 ??:"user-zh"
????????????????????????????
join??open ??
?????? Flink SQL UDF ????????
map ??tks?? ---- ??:"1048262223"<1048262...@qq.com; :2020??6??9??(??) 4:51 ??:"user-zh"
?????? Flink SQL UDF ????????
udfudf ---- ??:"Benchao Li"
回复:flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗
我大概get到你要说的需求,select那些其实是明细数据?但是没有跟聚合的数据拆开,所以才出现这种情况吧? -- 原始邮件 -- 发件人: Leonard Xu https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
回复:flink-1.10 读取hdfs目录下面所有文件,无输出
谢谢大佬,我看看 -- 原始邮件 -- 发件人: Sun.Zhu <17626017...@163.com 发送时间: 2020年6月2日 23:57 收件人: user-zh@flink.apache.org https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#part-file-lifecycle [2]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#rolling-policy Best Sun.Zhu | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月2日 19:20,kcz<573693...@qq.com 写道: 代码如下: String path = "hdfs://HACluster/user/flink/test-1/2020-05-29--15/"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path(path)); fileInputFormat.setNestedFileEnumeration(true); env.readFile(fileInputFormat, path).print(); env.execute();hdfs数据目录如下:/user/flink/test-1/2020-05-29--15/.part-0-0.inprogress.6c12fe72-5602-4458-b29f-c8c8b4a7b73b(有数据)/user/flink/test-1/2020-05-29--15/.part-1-0.inprogress.34b1d5ff-cf0d-4209-b409-21920b12327d(有数据)问题如下:flink无法获取到数据输出
flink-1.10 ????hdfs????????????????????????
?? String path = "hdfs://HACluster/user/flink/test-1/2020-05-29--15/"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path(path)); fileInputFormat.setNestedFileEnumeration(true); env.readFile(fileInputFormat, path).print(); env.execute();hdfs??/user/flink/test-1/2020-05-29--15/.part-0-0.inprogress.6c12fe72-5602-4458-b29f-c8c8b4a7b73b(??)/user/flink/test-1/2020-05-29--15/.part-1-0.inprogress.34b1d5ff-cf0d-4209-b409-21920b12327dflink??
?????? flink-1.10.0 hive-1.2.1 No operators defined in streaming topology
?? ?? ---- ??:"Benchao Li"