Hi,试了,将并行度设置为2和kafka分区数9,都试了,都只有一个consumer有watermark,可能是因为我开了一个producer吧
在 2020-08-13 16:57:25,"Shengkai Fang" 写道:
>hi, watermark本来就是通过watermark assigner生成的。这是正常现象。
>我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。
>
>Zhou Zach 于2020年8月13日周四 下午4:
d,
sex, age, created_time])
但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的
在 2020-08-13 15:39:44,"forideal" 写道:
>Hi Zhou Zach:
>你可以试试 env.disableOper
Hi forideal,
我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.setStateBackend(new
Hi all,
通过如下方式设置HBASE_CONF_PATH变量,提交到yarn时,发现HBASE_CONF_PATH没有生效,
/opt/flink-1.11.1/bin/flink run-application -t yarn-application \
-DHBASE_CONF_PATH='/etc/hbase/conf' \
请问flink提交job时,怎样设置环境变量?
Hi,
感谢详细答疑!
| |
Zhou Zach
|
|
邮箱:wander...@163.com
|
签名由 网易邮箱大师 定制
在2020年07月24日 11:48,Leonard Xu 写道:
Hi
"2020-07-23T19:53:15.509Z” 是 RFC-3339 格式,这个格式是带zone的时间格式,对应的数据类型是 timestamp
with local zone,这个应该在1.12里支持了[1]
1.10版本虽然是支持 RFC-3339 格式,但默认解析时区是有问题的,所以在1.11和1.12逐步中纠正了。
在1.11版本中,如果js
connectors/formats/json.html#json-timestamp-format-standard
>
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>
>
>> 在 2020年7月23日,20:54,Zhou Zach 写道:
>>
>> 当前作业有个sink
>> connect
作业消费不到数据吗?
>
>正常应该不会的,可以提供个可复现代码吗?
>
>祝好
>Leonard Xu
>
>
>> 在 2020年7月23日,18:13,Zhou Zach 写道:
>>
>> Hi all,
>>
>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#
Hi all,
根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
老参数:
streamTableEnv.executeSql(
"""
|
|CREATE TABLE kafka_table (
|uid
nice, 可以不用看Command-Line Interface的文档了
在 2020-07-16 16:16:00,"xiao cai" 写道:
>可以看这里https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> 原始邮件
>发件人: Zhou Zach
>收件人: user-zh
>发送时间: 2020年7月16日(周四) 15:28
>主题: Re:回复:flink1.11 set ya
-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了
在 2020-07-16 15:03:14,"flinkcx" 写道:
>是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4
>
>
> 原始邮件
>发件人: Zhou Zach
>收件人: Flink user-zh mailing list
>发送时间: 2020年7月16日(周四) 14:51
>主题: flink1.11 set yarn s
Hi all,
使用如下命令,设置Number of slots per TaskManager
/opt/flink-1.11.0/bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
-ys 4 \
Hi all,
flink sql 1.11 create table 是不是 不支持 IF NOT EXISTS
Query:
val hiveConfDir = "/etc/hive/conf"
val hiveVersion = "2.1.1"
val odsCatalog = "odsCatalog"
val odsHiveCatalog = new HiveCatalog(odsCatalog, "ods", hiveConfDir,
hiveVersion)
g/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
>
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive>
>
>> 在 2020年7月14日,20:29,Zhou Zach 写道:
>>
>> Hi,
>>
>>
>> 是在flink的con
Hi,
是在flink的conf文件配置hive.metastore.uris吗
在 2020-07-14 20:03:11,"Leonard Xu" 写道:
>Hello
>
>
>> 在 2020年7月14日,19:52,Zhou Zach 写道:
>>
>> : Embedded metastore is not allowed.
>
>Flink 集成 Hive 时,不支持 embedded metastore 的, 你需要起一个hive meta
hi all,
flink1.11 sql sink hive table 报错:
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.ApplicationExecutionException:
Could not execute application.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
Hi,
感谢社区热心答疑!
在 2020-07-14 11:00:18,"夏帅" 写道:
>你好,
>本质还是StreamingFileSink,所以目前只能append
>
>
>------
>发件人:Zhou Zach
>发送时间:2020年7月14日(星期二) 10:56
>收件人:user-zh
>主 题:Re:Re: flink 同时sink hb
Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式
在 2020-07-14 09:56:00,"Leonard Xu" 写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach 写道:
>>
>>>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)),
57.html#a42674
>
><http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674>
>
>
>> 在 2020年7月13日,21:09,Zhou Zach 写道:
>>
>>
>>
>> flink订阅kafka消息,同时sin
flink订阅kafka消息,同时sink到hbase和hive中,
当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
query:
streamTableEnv.executeSql(
"""
|
|CREATE TABLE hbase_table (
|rowkey VARCHAR,
|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
|)
好的,感谢答疑
在 2020-07-13 19:49:10,"Jingsong Li" 写道:
>创建kafka_table需要在default dialect下。
>
>不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach wrote:
>
>> 创建kafka_table的时
>partition到hive的默认实现,你需要自定义partition-commit-policy.
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach wrote:
>
>> 尴尬
>> 我开了两个项目,改错项目了,
ed_time, 'HH')
|FROM kafka_table
|
|""".stripMargin)
streamTableEnv.executeSql(
"""
|
|SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18'
|
|""".stripMargin)
.print()
在 20
Hi,
我现在改成了:
'sink.partition-commit.delay'='0s'
checkpoint完成了20多次,hdfs文件也产生了20多个,
hive表还是查不到数据
在 2020-07-13 17:23:34,"夏帅" 写道:
你好,
你设置了1个小时的
SINK_PARTITION_COMMIT_DELAY
--
发件人:Zhou Zach
发送时间:2020年7月13日(星期
)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
在 2020-07-13 16:52:16,"Jingsong Li" 写道:
>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou
用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach wrote:
>
>> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay
flink 1.11 sink hive table的connector设置为什么啊,尝试设置
WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
h','sink.partition-commit.policy.kind'='success-file');
也报错误
query:
streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
streamTableEnv.executeSql(
"""
找到了:
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
在 2020-07-13 14:01:45,"Zhou Zach" 写道:
>hi all,
>
>
>我像下面那种方式尝试,报错了
>
>
>streamTableEnv.executeSql(
>"""
>|
>|
>|SET table.sql-dialect=hive;
>|
hi all,
我像下面那种方式尝试,报错了
streamTableEnv.executeSql(
"""
|
|
|SET table.sql-dialect=hive;
|CREATE TABLE hive_table (
| user_id STRING,
| age INT
|) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
|
apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>>> at
>>>
>>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>>> at
>>>
>>> org.apache.flink.table.
--r-- 1 root root 23518 4月 20 20:47 log4j-slf4j-impl-2.12.1.jar
把table相关的包都下载下来了,还是报同样的错,好奇怪。。。
在 2020-07-10 10:24:02,"Congxian Qiu" 写道:
>Hi
>
>这个看上去是提交到 Yarn 了,具体的原因需要看下 JM log 是啥原因。另外是否是日志没有贴全,这里只看到本地 log,其他的就只有小部分
>jobmanager.err 的 log。
>
>Best,
hi all,
原来用1.10使用per job模式,可以提交的作业,现在用1.11使用应用模式提交失败,看日志,也不清楚原因,
yarn log:
Log Type: jobmanager.err
Log Upload Time: Thu Jul 09 21:02:48 +0800 2020
Log Length: 785
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
去掉就好了,感谢解答
在 2020-07-08 16:07:17,"Jingsong Li" 写道:
>Hi,
>
>你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
>
>所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
>并没有真正的物理节点。你不用再调用了。
>
>Best,
>Ji
nkPlanner().inStreamingMode().build
>val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> ..
>tableEnv.execute("")
>如果是的话,可以尝试使用bsEnv.execute("")
>1.11对于两者的execute代码实现有改动
>
>
>-----
代码在flink
1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
Exception in thread "main" java.lang.IllegalStateException: No operators
defined in streaming topology. Cannot generate StreamGraph.
at
ocs-release-1.11/dev/table/connectors/jdbc.html
>
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html>
>
>
>> 在 2020年7月8日,08:15,Zhou Zach 写道:
>>
>> hi all,
>> flink升级到1.11,flink-connector-jdbc
>> idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的
>>
>
hi all,
flink升级到1.11,flink-connector-jdbc
idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的
code:
val inpurtDS =
streamTableEnv.toAppendStream[BehaviorInfo](behaviorTable)inpurtDS.print()val
pattern = Pattern.begin[BehaviorInfo]("start")
.where(_.clickCount 7)val patternStream = CEP.pattern(inpurtDS, pattern)
val result: DataStream[BehaviorInfo] = patternStream.process(
new
有一些认证相关的问题 `2020-06-22 13:00:59,368 ERROR
>org.apache.flink.shaded.curator.org.apache.curator.ConnectionState -
>Authentication failed` 或许你可以先尝试解决下这个问题看看。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#resuming-from-a-retained-checkpoi
flink run -s 后面跟的参数是不是只能是savepointPath,不能是flnk job 自动checkpoint path吗
在 2020-06-22 14:32:02,"Zhou Zach" 写道:
>重启了CDH6集群,还是报同样的错误,flink 故障恢复不成功,不敢上生产啊,哪位大佬帮忙看下啊
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-22 13:
重启了CDH6集群,还是报同样的错误,flink 故障恢复不成功,不敢上生产啊,哪位大佬帮忙看下啊
在 2020-06-22 13:21:01,"Zhou Zach" 写道:
用yarn application kill flink job把yarn的application杀掉后,
执行/opt/flink-1.10.0/bin/flink run -s
hdfs://nameservice1:8020/user/flink10/checkpoints/f1b6f5392cd5053db155e709ffe9f871/chk-15
我是per job模式,不是yarn session模式啊
At 2020-06-19 20:06:47, "Rui Li" wrote:
>那得重启yarn session,再把作业提交上去
>
>On Fri, Jun 19, 2020 at 6:22 PM Zhou Zach wrote:
>
>>
>>
>>
>>
>>
>>
>> 用yarn app
在flink-1.10.0/conf/flink-conf.yaml中加了下面两个超时参数,不起作用
akka.client.timeout: 6
akka.ask.timeout: 600
有大佬知道是什么原因吗
在 2020-06-19 14:57:05,"Zhou Zach" 写道:
>
>
>
>
>用yarn application kill flink job后,
>执行/opt/flink-1.10.0/bin/flink run -s
2020-06-19 15:11:18,361 INFO org.apache.flink.client.cli.CliFrontend
- Triggering savepoint for job e229c76e6a1b43142cb4272523102ed1.
2020-06-19 15:11:18,378 INFO org.apache.flink.client.cli.CliFrontend
- Waiting for response...
2020-06-19
用yarn application kill flink job后,
执行/opt/flink-1.10.0/bin/flink run -s
/user/flink10/checkpoints/69e450574d8520ac5961e20a6fc4798a/chk-18/_metadata -d
-c dataflow.sql.FromKafkaSinkJdbcForCountPerSecond
/data/warehouse/streaming/data-flow-1.0.jar
2020-06-19 14:39:54,563 INFO
org.apache.flink.table.api.bridge.java.StreamTableEnvironment
在 2020-06-18 19:41:08,"Jark Wu" 写道:
>能贴下完整代码吗? (imports 部分)
>
>Best,
>Jark
>
>On Thu, 18 Jun 2020 at 19:18, Zhou Zach wrote:
>
>>
>>
>> flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTable
flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTableEnvironment,换成flink-1.11.0时,intellij
idea提示要换成org.apache.flink.table.api.bridge.java.StreamTableEnvironment,Intellij
Idea Build可以成功,就是打包的时候出错。。
[ERROR]
int,
|PRIMARY KEY (`time`)
|) WITH (
|'connector.type' = 'jdbc',
|'connector.write.flush.max-rows' = '1'
|)
|""".stripMargin)
At 2020-06-17 20:59:35, "Zhou Zach" wrote:
>Exception in thread "main" org.apache.f
那flink sql DDL的方式,读写,更新,删除hbase都是支持的吧
At 2020-06-17 13:45:15, "Jark Wu" wrote:
>Hi,
>
>HBase connector 不用声明 update-mode 属性。 也不能声明。
>
>Best,
>Jark
>
>On Wed, 17 Jun 2020 at 13:08, Zhou Zach wrote:
>
>> The pr
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.
Reason: No factory supports all
??hbase??hbase
----
??:"Leonard Xu"
??.??
offset (0) + length (4) exceed the capacity of the array: 2
?? hbaseint??
??users.addColumn("cf", "age", classOf[Integer]) ??
??int??IntegerInteger??int
2020-06-16 21:01:09,756 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
- Kafka version: unknown
2020-06-16 21:01:09,757 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
- Kafka commitId: unknown
2020-06-16 21:01:09,758 INFO
flink sql??HBase??ROWROW??INT
select cast(cf as Int) cf from hbase_table
??
flink sql ??ROW<`age` INT??INT??
streamTableEnv.sqlUpdate(
"""
|
|insert into user_age
|SELECT rowkey, cast(cf as int) as age
|FROM
| users
|
|""".stripMargin)??
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Field types of query result and registered TableSink
default_catalog.default_database.user_age do not match.
Query schema: [rowkey: STRING, cf: ROW<`age` INT>]
Sink schema: [rowkey: STRING, age: INT]
有输出的
在 2020-06-16 15:24:29,"王松" 写道:
>那你在命令行执行:hadoop classpath,有hadoop的classpath输出吗?
>
>Zhou Zach 于2020年6月16日周二 下午3:22写道:
>
>>
>>
>>
>>
>>
>>
>> 在/etc/profile下,目前只加了
>> export HADOOP_CLASSPATH=`hadoop clas
ort PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
>
>Zhou Zach 于2020年6月16日周二 下午2:53写道:
>
>> flink/lib/下的jar:
>> flink-connector-hive_2.11-1.10.0.jar
>> flink-dist_2.11-1.10.0.jar
>> flink-jdbc_2.11-1.10.0.jar
>> flink-json-1.10.0.jar
>> flink-shaded-ha
.jar
hbase-common-2.1.0.jar
hive-exec-2.1.1.jar
mysql-connector-java-5.1.49.jar
在 2020-06-16 14:48:43,"Zhou Zach" 写道:
>
>
>
>
>high-availability.storageDir: hdfs:///flink/ha/
>high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:218
high-availability: zookeeper
在 2020-06-16 14:48:43,"Zhou Zach" 写道:
>
>
>
>
>high-availability.storageDir: hdfs:///flink/ha/
>high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
>state.backend: filesystem
>state.checkpoints.dir: hdfs
-availability.zookeeper.path.root: /flink
在 2020-06-16 14:44:02,"王松" 写道:
>你的配置文件中ha配置可以贴下吗
>
>Zhou Zach 于2020年6月16日周二 下午1:49写道:
>
>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
>> initialize the cluster e
将flink-shaded-hadoop-2-3.0.0-cdh6.3.0-7.0.jar放在flink/lib目录下,或者打入fat jar都不起作用。。。
At 2020-06-16 13:49:27, "Zhou Zach" wrote:
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint YarnJobCluster
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint YarnJobClusterEntrypoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
at
or.write.buffer-flush.interval' = '2s'
|)
|""".stripMargin)
At 2020-06-15 20:19:22, "Zhou Zach" wrote:
>val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>val blinkEnvSettings =
>EnvironmentSettings.newInstance()
EM_TIME AS OF b.`proctime` AS u
| ON b.uid = u.rowkey
|
|""".stripMargin)
在 2020-06-15 20:01:16,"Leonard Xu" 写道:
>Hi,
>看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink sql
>中一般不需要读取bytes,读取到的数据应该是
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field types of query result and registered TableSink
default_catalog.default_database.user_cnt do not match.
Query schema: [time: STRING, age: BYTES]
Sink schema: [time: STRING, sum_age: INT]
改了源码,可以了
在 2020-06-15 16:17:46,"Leonard Xu" 写道:
>Hi
>
>
>> 在 2020年6月15日,15:36,Zhou Zach 写道:
>>
>> 'connector.version' expects '1.4.3', but is '2.1.0'
>
>Hbase connector只支持1.4.3的版本,其他不支持,但之前看有社区用户用1.4.3的connector写入高版本的case,你可以试下。
>
>祝好
>Leonard Xu
flink version: 1.10.0
hbase version: 2.1.0
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding
根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table,
临时表table source 必须要继承LookupableTableSource,
但是,看到https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector
-- lookup options,
-
>发件人:"Leonard Xu"发送时间:2020年6月12日(星期五) 下午5:43
>收件人:"user-zh"
>主题:Re: flink sql Temporal table join failed
>
>
>
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
> 在 2020年6月12日,17:38,Zhou Zach
>
感谢提醒
在 2020-06-12 17:43:20,"Leonard Xu" 写道:
>
>你刚好踩到了这个坑,这是flink保留的关键字(time)转义的bug,1.10.1及之后的版本(即将发布的1.11)中修复了的。
>
>祝好
>Leonard Xu
>
>> 在 2020年6月12日,17:38,Zhou Zach 写道:
>>
>>
>>
>>
>> 是的,1.10.0版本
>>
是的,1.10.0版本
在 2020-06-12 16:28:15,"Benchao Li" 写道:
>看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
>
>Zhou Zach 于2020年6月12日周五 下午3:47写道:
>
>> 还是不行,
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>
havior (
|uid VARCHAR,
|phoneType VARCHAR,
|clickCount INT,
|proctime AS PROCTIME(),
|`time` TIMESTAMP(3)
|) WITH (
|'connector.type' = 'kafka',
|'connector.version' = 'universal',
|'connector.topic' = 'user_behavior',
|'co
/streaming/time_attributes.html
>
>Zhou Zach 于2020年6月12日周五 下午1:33写道:
>
>> SLF4J: Class path contains multiple SLF4J bindings.
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
gs);
>
>
>Best,
>Leonard Xu
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>
><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>
>
>> 在 2020年6月12日,11
flink version 1.10.0
根据文档
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html#defining-temporal-table
想要Defining Temporal Table,但是没有发现getTableEnvironment。。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv =
的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下最新的代码,flink 中对应表 声明decimal(20, 0)处理。
>
>祝好,
>Leonard Xu
>
>[1] https://issues.apache.org/jira/browse/FLINK-17657
><https://issues.apache.org/jira/browse/FLINK-17657>
>
>> 在 2020年6月11日,13:51,Zhou Zach 写道:
>>
>> bigint(20) unsigned
>
>
>
>
>
>
>
>
>
>At 2020-06-11 13:22:07, "Zhou Zach" wrote:
>>SLF4J: Class path contains multiple SLF4J bindings.
>>
>>SLF4J: Found binding in
>>[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl
math.BigInteger cannot be cast
>> to java.lang.Long
>
>java.math.BigInteger 的范围比 java.lang.Long的范围大很多,是不能cast的,应该是你数据类型对应错误了,可以把mysql
>表的schema贴下吗?
>
>
>祝好,
>Leonard Xu
>
>> 在 2020年6月11日,13:22,Zhou Zach 写道:
>>
>> SLF4J: Class path contains multip
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
感谢回复!忘记设置用户名和密码了。。
At 2020-06-10 16:54:43, "wangweigu...@stevegame.cn"
wrote:
>
>Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using
>password: NO)
>得指定下有操作mysql这个表的权限账号了!
>
>
>
>发件人: Zhou Zach
>发送时间: 2020-06-10 1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
感谢回复,写入Kafka的时间戳改成"2020-06-10T12:12:43Z",消费成功了
在 2020-06-10 13:25:01,"Leonard Xu" 写道:
>Hi,
>
>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>
>报错信息说了是 json 解析失败了,按照之前大家踩的坑,请检查下两点:
>(1)json 中timestamp数据的格式必须是"2020-06-10T12:12:43Z", 不能是 long
Exception in thread "main" java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID:
994bd5a683143be23a23d77ed005d20d)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
code 代码乱码,重新截图一下:
在 2020-06-08 17:20:54,"Zhou Zach" 写道:
>
>
>
>使用JDBCOutputFormat的方式,一直没成功啊
>
>
>code:
>object FromKafkaSinkJdbcByJdbcOutputFormat { def main(args: Array[String]):
>Unit = { val env = getEnv() val topic = "t4&quo
48, 99, 48, 49,
49, 97, 51, 49, 54, 49, 97, 57, 100, 57, 100],[49, 53, 57, 49, 54, 48, 55, 54,
48, 57, 51, 57, 52],[105, 79, 83],1,null,null
不注释掉sink代码:
rowDS.writeUsingOutputFormat( jdbcOutput )
就看不到日志,是不是定义的jdbcOutput不对啊
在 2020-06-03 19:16:47,"chaojianok" 写道:
>推荐 JDBCOutputFormat 吧,简单易用。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-03 18:11:38,"Zhou Zach" 写道:
>>hi all,
>> flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?
hi all,
flink sink to mysql 是推荐 继承RichSinkFunction的方式,还是 通过JDBCOutputFormat的方式?
/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java#L294>
>
>
>
>> 在 2020年5月28日,13:59,Zhou Zach 写道:
>>
>> 多谢指点,可以了。
>> 但是换成动态插入,有问题:
>> org.apache.flink.client.program.ProgramInvocationException: The main method
>>
|""".stripMargin)
在 2020-05-28 13:39:49,"Leonard Xu" 写道:
>Hi,
>>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month`
>> = 5
>
>应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
>
>祝好,
>Leonard Xu
>
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Field types of query result and registered TableSink
dwdCatalog.dwd.t1_copy do not match.
Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: INT
NOT NULL, EXPR$5: INT NOT NULL]
好的,感谢指点
在 2020-05-27 19:33:42,"Rui Li" 写道:
>你是想要调试HiveCatalog的代码么?可以参考flink里的测试用例,我们有的测试是用embedded模式做的(比如HiveCatalogHiveMetadataTest),有些测试是单独起一个HMS进程(比如TableEnvHiveConnectorTest)。
>
>On Wed, May 27, 2020 at 7:27 PM Zhou Zach wrote:
>
>> 是的,发现了,感谢指点。请教
是的,发现了,感谢指点。请教下,用intellij
idea调试,你是在本地调试吗,那样的话,要在本地搭建个hadoop集群吗,至少要搭建个本地的hive吧,还是直接用intellij
idea连接远程,如果集群在阿里云上,是不是要另外开端口的
在 2020-05-27 19:19:58,"Rui Li" 写道:
>year在calcite里是保留关键字,你用`year`试试呢
>
>On Wed, May 27, 2020 at 7:09 PM Zhou Zach wrote:
>
&
找到原因了,flink 把year 当成关键字了
At 2020-05-27 19:09:43, "Zhou Zach" wrote:
>The program finished with the following exception:
>
>
>org.apache.flink.client.program.ProgramInvocationException: The main method
>caused an error: SQL parse failed. Encountered &
; ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUE
uot; 写道:
>Flink 支持hive分区表的,看你在另外一个邮件里贴了,你能把你的hive表和query在邮件里贴下吗?
>
>祝好
>Leonard Xu
>
>> 在 2020年5月27日,17:40,Zhou Zach 写道:
>>
>>
>>
>>
>> 感谢回复,表名前加上Catalog和db前缀可以成功访问了。
>> 现在遇到个问题,flink 读hive 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问
8,565 INFO org.apache.hadoop.hive.conf.HiveConf
>
>
>
>祝好
>Leonard Xu
>
>
>> 在 2020年5月27日,10:55,Zhou Zach 写道:
>>
>> hi all,
>> Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink
>> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库
Flink version: 1.10.0
Flink sql read hive partition key failed,flink sql 是不是不支持hive 分区键
code:
val settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv = TableEnvironment.create(settings)
val hiveConfDir = "/etc/hive/conf" // a local
hi all,
Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink
sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库
98 matches
Mail list logo