Re:Re: Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 文章 Zhou Zach
Hi,试了,将并行度设置为2和kafka分区数9,都试了,都只有一个consumer有watermark,可能是因为我开了一个producer吧 在 2020-08-13 16:57:25,"Shengkai Fang" 写道: >hi, watermark本来就是通过watermark assigner生成的。这是正常现象。 >我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。 > >Zhou Zach 于2020年8月13日周四 下午4:

Re:Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 文章 Zhou Zach
d, sex, age, created_time]) 但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的 在 2020-08-13 15:39:44,"forideal" 写道: >Hi Zhou Zach: >你可以试试 env.disableOper

Re:Re:Re:Flink SQL No Watermark

2020-08-13 文章 Zhou Zach
Hi forideal, 我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.setStateBackend(new

flink run-application 怎样设置配置文件的环境变量

2020-08-03 文章 Zhou Zach
Hi all, 通过如下方式设置HBASE_CONF_PATH变量,提交到yarn时,发现HBASE_CONF_PATH没有生效, /opt/flink-1.11.1/bin/flink run-application -t yarn-application \ -DHBASE_CONF_PATH='/etc/hbase/conf' \ 请问flink提交job时,怎样设置环境变量?

回复:flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Zhou Zach
Hi, 感谢详细答疑! | | Zhou Zach | | 邮箱:wander...@163.com | 签名由 网易邮箱大师 定制 在2020年07月24日 11:48,Leonard Xu 写道: Hi "2020-07-23T19:53:15.509Z” 是 RFC-3339 格式,这个格式是带zone的时间格式,对应的数据类型是 timestamp with local zone,这个应该在1.12里支持了[1] 1.10版本虽然是支持 RFC-3339 格式,但默认解析时区是有问题的,所以在1.11和1.12逐步中纠正了。 在1.11版本中,如果js

Re:Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Zhou Zach
connectors/formats/json.html#json-timestamp-format-standard > ><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard> > >> 在 2020年7月23日,20:54,Zhou Zach 写道: >> >> 当前作业有个sink >> connect

Re:Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Zhou Zach
作业消费不到数据吗? > >正常应该不会的,可以提供个可复现代码吗? > >祝好 >Leonard Xu > > >> 在 2020年7月23日,18:13,Zhou Zach 写道: >> >> Hi all, >> >> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#

flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Zhou Zach
Hi all, 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 老参数: streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( |uid

Re:Re:回复:flink1.11 set yarn slots failed

2020-07-16 文章 Zhou Zach
nice, 可以不用看Command-Line Interface的文档了 在 2020-07-16 16:16:00,"xiao cai" 写道: >可以看这里https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html > > 原始邮件 >发件人: Zhou Zach >收件人: user-zh >发送时间: 2020年7月16日(周四) 15:28 >主题: Re:回复:flink1.11 set ya

Re:回复:flink1.11 set yarn slots failed

2020-07-16 文章 Zhou Zach
-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > > 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set yarn s

flink1.11 set yarn slots failed

2020-07-16 文章 Zhou Zach
Hi all, 使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -ys 4 \

flink sql 1.11 create hive table error

2020-07-15 文章 Zhou Zach
Hi all, flink sql 1.11 create table 是不是 不支持 IF NOT EXISTS Query: val hiveConfDir = "/etc/hive/conf" val hiveVersion = "2.1.1" val odsCatalog = "odsCatalog" val odsHiveCatalog = new HiveCatalog(odsCatalog, "ods", hiveConfDir, hiveVersion)

Re:Re: flink1.11 sink hive error

2020-07-14 文章 Zhou Zach
g/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive > ><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive> > >> 在 2020年7月14日,20:29,Zhou Zach 写道: >> >> Hi, >> >> >> 是在flink的con

Re:Re: flink1.11 sink hive error

2020-07-14 文章 Zhou Zach
Hi, 是在flink的conf文件配置hive.metastore.uris吗 在 2020-07-14 20:03:11,"Leonard Xu" 写道: >Hello > > >> 在 2020年7月14日,19:52,Zhou Zach 写道: >> >> : Embedded metastore is not allowed. > >Flink 集成 Hive 时,不支持 embedded metastore 的, 你需要起一个hive meta

flink1.11 sink hive error

2020-07-14 文章 Zhou Zach
hi all, flink1.11 sql sink hive table 报错: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)

Re:回复:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
Hi, 感谢社区热心答疑! 在 2020-07-14 11:00:18,"夏帅" 写道: >你好, >本质还是StreamingFileSink,所以目前只能append > > >------ >发件人:Zhou Zach >发送时间:2020年7月14日(星期二) 10:56 >收件人:user-zh >主 题:Re:Re: flink 同时sink hb

Re:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
Hi Leonard, 原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式 在 2020-07-14 09:56:00,"Leonard Xu" 写道: >Hi, > >> 在 2020年7月14日,09:52,Zhou Zach 写道: >> >>>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)),

Re:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
57.html#a42674 > ><http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674> > > >> 在 2020年7月13日,21:09,Zhou Zach 写道: >> >> >> >> flink订阅kafka消息,同时sin

flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
flink订阅kafka消息,同时sink到hbase和hive中, 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条 query: streamTableEnv.executeSql( """ | |CREATE TABLE hbase_table ( |rowkey VARCHAR, |cf ROW(sex VARCHAR, age INT, created_time VARCHAR) |)

Re:Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
好的,感谢答疑 在 2020-07-13 19:49:10,"Jingsong Li" 写道: >创建kafka_table需要在default dialect下。 > >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法) > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach wrote: > >> 创建kafka_table的时

Re:Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
>partition到hive的默认实现,你需要自定义partition-commit-policy. > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach wrote: > >> 尴尬 >> 我开了两个项目,改错项目了,

Re:Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
ed_time, 'HH') |FROM kafka_table | |""".stripMargin) streamTableEnv.executeSql( """ | |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18' | |""".stripMargin) .print() 在 20

Re:回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
Hi, 我现在改成了: 'sink.partition-commit.delay'='0s' checkpoint完成了20多次,hdfs文件也产生了20多个, hive表还是查不到数据 在 2020-07-13 17:23:34,"夏帅" 写道: 你好, 你设置了1个小时的 SINK_PARTITION_COMMIT_DELAY -- 发件人:Zhou Zach 发送时间:2020年7月13日(星期

Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
) streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 在 2020-07-13 16:52:16,"Jingsong Li" 写道: >有开checkpoint吧?delay设的多少? > >Add partition 在 checkpoint完成 + delay的时间后 > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 4:50 PM Zhou

Re:Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach wrote: > >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 >> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay

Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
flink 1.11 sink hive table的connector设置为什么啊,尝试设置 WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file'); 也报错误 query: streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) streamTableEnv.executeSql( """

Re:how to set table.sql-dialect in flink1.11 StreamTableEnvironment

2020-07-13 文章 Zhou Zach
找到了: tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); 在 2020-07-13 14:01:45,"Zhou Zach" 写道: >hi all, > > >我像下面那种方式尝试,报错了 > > >streamTableEnv.executeSql( >""" >| >| >|SET table.sql-dialect=hive; >|

how to set table.sql-dialect in flink1.11 StreamTableEnvironment

2020-07-13 文章 Zhou Zach
hi all, 我像下面那种方式尝试,报错了 streamTableEnv.executeSql( """ | | |SET table.sql-dialect=hive; |CREATE TABLE hive_table ( | user_id STRING, | age INT |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( |

Re:Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-10 文章 Zhou Zach
apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>> at >>> >>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>> at >>> >>> org.apache.flink.table.

Re:Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 文章 Zhou Zach
--r-- 1 root root 23518 4月 20 20:47 log4j-slf4j-impl-2.12.1.jar 把table相关的包都下载下来了,还是报同样的错,好奇怪。。。 在 2020-07-10 10:24:02,"Congxian Qiu" 写道: >Hi > >这个看上去是提交到 Yarn 了,具体的原因需要看下 JM log 是啥原因。另外是否是日志没有贴全,这里只看到本地 log,其他的就只有小部分 >jobmanager.err 的 log。 > >Best,

flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 文章 Zhou Zach
hi all, 原来用1.10使用per job模式,可以提交的作业,现在用1.11使用应用模式提交失败,看日志,也不清楚原因, yarn log: Log Type: jobmanager.err Log Upload Time: Thu Jul 09 21:02:48 +0800 2020 Log Length: 785 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in

Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 文章 Zhou Zach
去掉就好了,感谢解答 在 2020-07-08 16:07:17,"Jingsong Li" 写道: >Hi, > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")" >并没有真正的物理节点。你不用再调用了。 > >Best, >Ji

Re:回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 文章 Zhou Zach
nkPlanner().inStreamingMode().build >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > .. >tableEnv.execute("") >如果是的话,可以尝试使用bsEnv.execute("") >1.11对于两者的execute代码实现有改动 > > >-----

flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 文章 Zhou Zach
代码在flink 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at

Re:Re: flink 1.11 connector jdbc 依赖解析失败

2020-07-08 文章 Zhou Zach
ocs-release-1.11/dev/table/connectors/jdbc.html > ><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html> > > >> 在 2020年7月8日,08:15,Zhou Zach 写道: >> >> hi all, >> flink升级到1.11,flink-connector-jdbc >> idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的 >> >

flink 1.11 connector jdbc 依赖解析失败

2020-07-07 文章 Zhou Zach
hi all, flink升级到1.11,flink-connector-jdbc idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的

flink cep result DataStream no data print

2020-07-05 文章 Zhou Zach
code: val inpurtDS = streamTableEnv.toAppendStream[BehaviorInfo](behaviorTable)inpurtDS.print()val pattern = Pattern.begin[BehaviorInfo]("start") .where(_.clickCount 7)val patternStream = CEP.pattern(inpurtDS, pattern) val result: DataStream[BehaviorInfo] = patternStream.process( new

Re:Re: Re:Re:Re: Re: Re: flink run from checkpoit failed

2020-06-22 文章 Zhou Zach
有一些认证相关的问题 `2020-06-22 13:00:59,368 ERROR >org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - >Authentication failed` 或许你可以先尝试解决下这个问题看看。 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#resuming-from-a-retained-checkpoi

Re:Re:Re:Re: Re: Re: flink run from checkpoit failed

2020-06-22 文章 Zhou Zach
flink run -s 后面跟的参数是不是只能是savepointPath,不能是flnk job 自动checkpoint path吗 在 2020-06-22 14:32:02,"Zhou Zach" 写道: >重启了CDH6集群,还是报同样的错误,flink 故障恢复不成功,不敢上生产啊,哪位大佬帮忙看下啊 > > > > > > > > > > > > > > > > >在 2020-06-22 13:

Re:Re:Re: Re: Re: flink run from checkpoit failed

2020-06-22 文章 Zhou Zach
重启了CDH6集群,还是报同样的错误,flink 故障恢复不成功,不敢上生产啊,哪位大佬帮忙看下啊 在 2020-06-22 13:21:01,"Zhou Zach" 写道: 用yarn application kill flink job把yarn的application杀掉后, 执行/opt/flink-1.10.0/bin/flink run -s hdfs://nameservice1:8020/user/flink10/checkpoints/f1b6f5392cd5053db155e709ffe9f871/chk-15

Re:Re: Re: flink run from checkpoit failed

2020-06-19 文章 Zhou Zach
我是per job模式,不是yarn session模式啊 At 2020-06-19 20:06:47, "Rui Li" wrote: >那得重启yarn session,再把作业提交上去 > >On Fri, Jun 19, 2020 at 6:22 PM Zhou Zach wrote: > >> >> >> >> >> >> >> 用yarn app

Re:flink run from checkpoit failed

2020-06-19 文章 Zhou Zach
在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用 akka.client.timeout: 6 akka.ask.timeout: 600 有大佬知道是什么原因吗 在 2020-06-19 14:57:05,"Zhou Zach" 写道: > > > > >用yarn application kill flink job后, >执行/opt/flink-1.10.0/bin/flink run -s

flink job自动checkpoint是成功,手动checkpoint失败

2020-06-19 文章 Zhou Zach
2020-06-19 15:11:18,361 INFO org.apache.flink.client.cli.CliFrontend - Triggering savepoint for job e229c76e6a1b43142cb4272523102ed1. 2020-06-19 15:11:18,378 INFO org.apache.flink.client.cli.CliFrontend - Waiting for response... 2020-06-19

flink run from checkpoit failed

2020-06-19 文章 Zhou Zach
用yarn application kill flink job后, 执行/opt/flink-1.10.0/bin/flink run -s /user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata -d -c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond /data/warehouse/streaming/data-flow-1.0.jar 2020-06-19 14:39:54,563 INFO

Re:Re: 项目引用flink-1.11.0,打包失败

2020-06-18 文章 Zhou Zach
org.apache.flink.table.api.bridge.java.StreamTableEnvironment 在 2020-06-18 19:41:08,"Jark Wu" 写道: >能贴下完整代码吗? (imports 部分) > >Best, >Jark > >On Thu, 18 Jun 2020 at 19:18, Zhou Zach wrote: > >> >> >> flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTable

项目引用flink-1.11.0,打包失败

2020-06-18 文章 Zhou Zach
flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTableEnvironment,换成flink-1.11.0时,intellij idea提示要换成org.apache.flink.table.api.bridge.java.StreamTableEnvironment,Intellij Idea Build可以成功,就是打包的时候出错。。 [ERROR]

Re:flink sql sink mysql requires primary keys

2020-06-17 文章 Zhou Zach
int, |PRIMARY KEY (`time`) |) WITH ( |'connector.type' = 'jdbc', |'connector.write.flush.max-rows' = '1' |) |""".stripMargin) At 2020-06-17 20:59:35, "Zhou Zach" wrote: >Exception in thread "main" org.apache.f

Re:Re: flink sql DDL Unsupported update-mode hbase

2020-06-16 文章 Zhou Zach
那flink sql DDL的方式,读写,更新,删除hbase都是支持的吧 At 2020-06-17 13:45:15, "Jark Wu" wrote: >Hi, > >HBase connector 不用声明 update-mode 属性。 也不能声明。 > >Best, >Jark > >On Wed, 17 Jun 2020 at 13:08, Zhou Zach wrote: > >> The pr

flink sql DDL Unsupported update-mode hbase

2020-06-16 文章 Zhou Zach
The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: No factory supports all

?????? flink sql ????????ROW??????????INT

2020-06-16 文章 Zhou Zach
??hbase??hbase ---- ??:"Leonard Xu"

?????? flink sql ????????ROW??????????INT

2020-06-16 文章 Zhou Zach
??.?? offset (0) + length (4) exceed the capacity of the array: 2 ?? hbaseint?? ??users.addColumn("cf", "age", classOf[Integer]) ?? ??int??IntegerInteger??int

?????? Re: flink sql read hbase sink mysql data type not match

2020-06-16 文章 Zhou Zach
2020-06-16 21:01:09,756 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka version: unknown 2020-06-16 21:01:09,757 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: unknown 2020-06-16 21:01:09,758 INFO

flink sql ????????ROW??????????INT

2020-06-16 文章 Zhou Zach
flink sql??HBase??ROWROW??INT select cast(cf as Int) cf from hbase_table ??

Re:Re: flink sql read hbase sink mysql data type not match

2020-06-16 文章 Zhou Zach
flink sql ??ROW<`age` INT??INT?? streamTableEnv.sqlUpdate( """ | |insert into user_age |SELECT rowkey, cast(cf as int) as age |FROM | users | |""".stripMargin)??

flink sql read hbase sink mysql data type not match

2020-06-16 文章 Zhou Zach
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field types of query result and registered TableSink default_catalog.default_database.user_age do not match. Query schema: [rowkey: STRING, cf: ROW<`age` INT>] Sink schema: [rowkey: STRING, age: INT]

Re:Re: Re: Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
有输出的 在 2020-06-16 15:24:29,"王松" 写道: >那你在命令行执行:hadoop classpath,有hadoop的classpath输出吗? > >Zhou Zach 于2020年6月16日周二 下午3:22写道: > >> >> >> >> >> >> >> 在/etc/profile下,目前只加了 >> export HADOOP_CLASSPATH=`hadoop clas

Re:Re: Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
ort PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH > >Zhou Zach 于2020年6月16日周二 下午2:53写道: > >> flink/lib/下的jar: >> flink-connector-hive_2.11-1.10.0.jar >> flink-dist_2.11-1.10.0.jar >> flink-jdbc_2.11-1.10.0.jar >> flink-json-1.10.0.jar >> flink-shaded-ha

Re:Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
.jar hbase-common-2.1.0.jar hive-exec-2.1.1.jar mysql-connector-java-5.1.49.jar 在 2020-06-16 14:48:43,"Zhou Zach" 写道: > > > > >high-availability.storageDir: hdfs:///flink/ha/ >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:218

Re:Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
high-availability: zookeeper 在 2020-06-16 14:48:43,"Zhou Zach" 写道: > > > > >high-availability.storageDir: hdfs:///flink/ha/ >high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 >state.backend: filesystem >state.checkpoints.dir: hdfs

Re:Re: flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
-availability.zookeeper.path.root: /flink 在 2020-06-16 14:44:02,"王松" 写道: >你的配置文件中ha配置可以贴下吗 > >Zhou Zach 于2020年6月16日周二 下午1:49写道: > >> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to >> initialize the cluster e

Re:flink sql job 提交到yarn上报错

2020-06-16 文章 Zhou Zach
将flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar放在flink/lib目录下,或者打入fat jar都不起作用。。。 At 2020-06-16 13:49:27, "Zhou Zach" wrote: org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobCluster

flink sql job 提交到yarn上报错

2020-06-15 文章 Zhou Zach
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187) at

Re:Re:Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

2020-06-15 文章 Zhou Zach
or.write.buffer-flush.interval' = '2s' |) |""".stripMargin) At 2020-06-15 20:19:22, "Zhou Zach" wrote: >val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >val blinkEnvSettings = >EnvironmentSettings.newInstance()

Re:Re: flink sql 怎样将从hbase中取出的BYTES类型转换成Int

2020-06-15 文章 Zhou Zach
EM_TIME AS OF b.`proctime` AS u | ON b.uid = u.rowkey | |""".stripMargin) 在 2020-06-15 20:01:16,"Leonard Xu" 写道: >Hi, >看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql >中一般不需要读取bytes,读取到的数据应该是

flink sql 怎样将从hbase中取出的BYTES类型转换成Int

2020-06-15 文章 Zhou Zach
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.user_cnt do not match. Query schema: [time: STRING, age: BYTES] Sink schema: [time: STRING, sum_age: INT]

Re:Re: flink sql sink hbase failed

2020-06-15 文章 Zhou Zach
改了源码,可以了 在 2020-06-15 16:17:46,"Leonard Xu" 写道: >Hi > > >> 在 2020年6月15日,15:36,Zhou Zach 写道: >> >> 'connector.version' expects '1.4.3', but is '2.1.0' > >Hbase connector只支持1.4.3的版本,其他不支持,但之前看有社区用户用1.4.3的connector写入高版本的case,你可以试下。 > >祝好 >Leonard Xu

flink sql sink hbase failed

2020-06-15 文章 Zhou Zach
flink version: 1.10.0 hbase version: 2.1.0 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding

flink sql DDL支持 Temporal Table 定义吗

2020-06-14 文章 Zhou Zach
根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table, 临时表table source 必须要继承LookupableTableSource, 但是,看到https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector -- lookup options,

Re:回复: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach
- >发件人:"Leonard Xu"发送时间:2020年6月12日(星期五) 下午5:43 >收件人:"user-zh" >主题:Re: flink sql Temporal table join failed > > > > >你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。 > >祝好 >Leonard Xu > > 在 2020年6月12日,17:38,Zhou Zach >

Re:Re: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach
感谢提醒 在 2020-06-12 17:43:20,"Leonard Xu" 写道: > >你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。 > >祝好 >Leonard Xu > >> 在 2020年6月12日,17:38,Zhou Zach 写道: >> >> >> >> >> 是的,1.10.0版本 >>

Re:Re: Re: Re: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach
是的,1.10.0版本 在 2020-06-12 16:28:15,"Benchao Li" 写道: >看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。 > >Zhou Zach 于2020年6月12日周五 下午3:47写道: > >> 还是不行, >> SLF4J: Class path contains multiple SLF4J bindings. >> SLF4J: Found binding in >

Re:Re: Re: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach
havior ( |uid VARCHAR, |phoneType VARCHAR, |clickCount INT, |proctime AS PROCTIME(), |`time` TIMESTAMP(3) |) WITH ( |'connector.type' = 'kafka', |'connector.version' = 'universal', |'connector.topic' = 'user_behavior', |'co

Re:Re: flink sql Temporal table join failed

2020-06-12 文章 Zhou Zach
/streaming/time_attributes.html > >Zhou Zach 于2020年6月12日周五 下午1:33写道: > >> SLF4J: Class path contains multiple SLF4J bindings. >> >> SLF4J: Found binding in >> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.

flink sql Temporal table join failed

2020-06-11 文章 Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in

Re:Re: flink TableEnvironment can not call getTableEnvironment api

2020-06-11 文章 Zhou Zach
gs); > > >Best, >Leonard Xu >[1] >https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment > ><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> > >> 在 2020年6月12日,11

flink TableEnvironment can not call getTableEnvironment api

2020-06-11 文章 Zhou Zach
flink version 1.10.0 根据文档 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html#defining-temporal-table 想要Defining Temporal Table,但是没有发现getTableEnvironment。。 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv =

Re:Re: flink sql bigint cannot be cast to mysql Long

2020-06-11 文章 Zhou Zach
的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下最新的代码,flink 中对应表 声明decimal(20, 0)处理。 > >祝好, >Leonard Xu > >[1] https://issues.apache.org/jira/browse/FLINK-17657 ><https://issues.apache.org/jira/browse/FLINK-17657> > >> 在 2020年6月11日,13:51,Zhou Zach 写道: >> >> bigint(20) unsigned >

Re:Re:flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach
> > > > > > > > >At 2020-06-11 13:22:07, "Zhou Zach" wrote: >>SLF4J: Class path contains multiple SLF4J bindings. >> >>SLF4J: Found binding in >>[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl

Re:Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach
math.BigInteger cannot be cast >> to java.lang.Long > >java.math.BigInteger 的范围比 java.lang.Long的范围大很多,是不能cast的,应该是你数据类型对应错误了,可以把mysql >表的schema贴下吗? > > >祝好, >Leonard Xu > >> 在 2020年6月11日,13:22,Zhou Zach 写道: >> >> SLF4J: Class path contains multip

flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in

Re:回复: sink mysql 失败

2020-06-10 文章 Zhou Zach
感谢回复!忘记设置用户名和密码了。。 At 2020-06-10 16:54:43, "wangweigu...@stevegame.cn" wrote: > >Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using >password: NO) >得指定下有操作mysql这个表的权限账号了! > > > >发件人: Zhou Zach >发送时间: 2020-06-10 1

sink mysql 失败

2020-06-10 文章 Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in

Re:Re: flink sql 消费kafka失败

2020-06-09 文章 Zhou Zach
感谢回复,写入Kafka的时间戳改成"2020-06-10T12:12:43Z",消费成功了 在 2020-06-10 13:25:01,"Leonard Xu" 写道: >Hi, > >> Caused by: java.io.IOException: Failed to deserialize JSON object. > >报错信息说了是 json 解析失败了,按照之前大家踩的坑,请检查下两点: >(1)json 中timestamp数据的格式必须是"2020-06-10T12:12:43Z", 不能是 long

flink sql 消费kafka失败

2020-06-09 文章 Zhou Zach
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 994bd5a683143be23a23d77ed005d20d) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at

Re:Re:Re:flink sink to mysql

2020-06-08 文章 Zhou Zach
code 代码乱码,重新截图一下: 在 2020-06-08 17:20:54,"Zhou Zach" 写道: > > > >使用JDBCOutputFormat的方式,一直没成功啊 > > >code: >object FromKafkaSinkJdbcByJdbcOutputFormat { def main(args: Array[String]): >Unit = { val env = getEnv() val topic = "t4&quo

Re:Re:flink sink to mysql

2020-06-08 文章 Zhou Zach
48, 99, 48, 49, 49, 97, 51, 49, 54, 49, 97, 57, 100, 57, 100],[49, 53, 57, 49, 54, 48, 55, 54, 48, 57, 51, 57, 52],[105, 79, 83],1,null,null 不注释掉sink代码: rowDS.writeUsingOutputFormat( jdbcOutput ) 就看不到日志,是不是定义的jdbcOutput不对啊 在 2020-06-03 19:16:47,"chaojianok" 写道: >推荐 JDBCOutputFormat 吧,简单易用。 > > > > > > > > > > > > > > > > > >在 2020-06-03 18:11:38,"Zhou Zach" 写道: >>hi all, >> flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?

flink sink to mysql

2020-06-03 文章 Zhou Zach
hi all, flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?

Re:Re: flink sql 写 hive分区表失败

2020-05-28 文章 Zhou Zach
/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java#L294> > > > >> 在 2020年5月28日,13:59,Zhou Zach 写道: >> >> 多谢指点,可以了。 >> 但是换成动态插入,有问题: >> org.apache.flink.client.program.ProgramInvocationException: The main method >>

Re:Re: flink sql 写 hive分区表失败

2020-05-28 文章 Zhou Zach
|""".stripMargin) 在 2020-05-28 13:39:49,"Leonard Xu" 写道: >Hi, >>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` >> = 5 > >应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧 > >祝好, >Leonard Xu >

flink sql 写 hive分区表失败

2020-05-27 文章 Zhou Zach
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field types of query result and registered TableSink dwdCatalog.dwd.t1_copy do not match. Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: INT NOT NULL, EXPR$5: INT NOT NULL]

Re:Re: Re: Re: Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
好的,感谢指点 在 2020-05-27 19:33:42,"Rui Li" 写道: >你是想要调试HiveCatalog的代码么?可以参考flink里的测试用例,我们有的测试是用embedded模式做的(比如HiveCatalogHiveMetadataTest),有些测试是单独起一个HMS进程(比如TableEnvHiveConnectorTest)。 > >On Wed, May 27, 2020 at 7:27 PM Zhou Zach wrote: > >> 是的,发现了,感谢指点。请教

Re:Re: Re: Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
是的,发现了,感谢指点。请教下,用intellij idea调试,你是在本地调试吗,那样的话,要在本地搭建个hadoop集群吗,至少要搭建个本地的hive吧,还是直接用intellij idea连接远程,如果集群在阿里云上,是不是要另外开端口的 在 2020-05-27 19:19:58,"Rui Li" 写道: >year在calcite里是保留关键字,你用`year`试试呢 > >On Wed, May 27, 2020 at 7:09 PM Zhou Zach wrote: > &

Re:Re:Re: Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
找到原因了,flink 把year 当成关键字了 At 2020-05-27 19:09:43, "Zhou Zach" wrote: >The program finished with the following exception: > > >org.apache.flink.client.program.ProgramInvocationException: The main method >caused an error: SQL parse failed. Encountered &

Re:Re: Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
; ... "CEIL" ... "CEILING" ... "SUBSTRING" ... "TRIM" ... "CLASSIFIER" ... "MATCH_NUMBER" ... "RUNNING" ... "PREV" ... "JSON_EXISTS" ... "JSON_VALUE" ... "JSON_QUE

Re:Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
uot; 写道: >Flink 支持hive分区表的,看你在另外一个邮件里贴了,你能把你的hive表和query在邮件里贴下吗? > >祝好 >Leonard Xu > >> 在 2020年5月27日,17:40,Zhou Zach 写道: >> >> >> >> >> 感谢回复,表名前加上Catalog和db前缀可以成功访问了。 >> 现在遇到个问题,flink 读hive 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问

Re:Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
8,565 INFO org.apache.hadoop.hive.conf.HiveConf > > > >祝好 >Leonard Xu > > >> 在 2020年5月27日,10:55,Zhou Zach 写道: >> >> hi all, >> Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink >> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库

Flink read hive partition table failed

2020-05-26 文章 Zhou Zach
Flink version: 1.10.0 Flink sql read hive partition key failed,flink sql 是不是不支持hive 分区键 code: val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val tableEnv = TableEnvironment.create(settings) val hiveConfDir = "/etc/hive/conf" // a local

Flink sql 跨库

2020-05-26 文章 Zhou Zach
hi all, Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库