Re: flink sql 1.10 insert into tb select 复杂schema 失败

2020-07-14 文章 Benchao Li
赞 (不过这个应该只是json format的bug,跟connector没有关系)

Peihui He  于2020年7月15日周三 下午1:16写道:

> Hi BenChao,
>
> 换成1.10.1 就可以了。刚才那封邮件不行,是因为依赖flink-kafka的依赖版本没有修改过来。
> Thank you.
>
>
> Benchao Li  于2020年7月15日周三 上午10:25写道:
>
> > Hi Peihui,
> >
> > 这是一个已知bug[1],已经在1.10.1和1.11.0中修复了,可以尝试下这两个版本。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-16220
> >
> > Peihui He  于2020年7月15日周三 上午9:54写道:
> >
> > > Hello,
> > >
> > >  在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如
> > > create table xxx (
> > > a string,
> > > b row(
> > >  c row(d string)
> > >   )
> > > )
> > >
> > > 当c 中有值的时候,sql 如下 insert into select * from xxx会出现下面错误
> > >
> > > Caused by: java.lang.ClassCastException:
> > >
> > >
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode
> > > cannot be cast to
> > >
> > >
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> > > at
> > >
> > >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:337)
> > > at
> > >
> > >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> > > at
> > >
> > >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
> > > at
> > >
> > >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> > > at
> > >
> > >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
> > > at
> > >
> > >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> > > at
> > >
> > >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:138)
> > > ... 38 more
> > >
> > >
> > > Best wishes.
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: flink 1.11 upsert????????

2020-07-14 文章 ??????
??Python??

Re: flink sql 1.10 insert into tb select 复杂schema 失败

2020-07-14 文章 Peihui He
Hi BenChao,

换成1.10.1 就可以了。刚才那封邮件不行,是因为依赖flink-kafka的依赖版本没有修改过来。
Thank you.


Benchao Li  于2020年7月15日周三 上午10:25写道:

> Hi Peihui,
>
> 这是一个已知bug[1],已经在1.10.1和1.11.0中修复了,可以尝试下这两个版本。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16220
>
> Peihui He  于2020年7月15日周三 上午9:54写道:
>
> > Hello,
> >
> >  在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如
> > create table xxx (
> > a string,
> > b row(
> >  c row(d string)
> >   )
> > )
> >
> > 当c 中有值的时候,sql 如下 insert into select * from xxx会出现下面错误
> >
> > Caused by: java.lang.ClassCastException:
> >
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode
> > cannot be cast to
> >
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:337)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:138)
> > ... 38 more
> >
> >
> > Best wishes.
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-14 文章 Congxian Qiu
Hi

我尝试理解一下:
1 你用 1.9 跑 wordcount 作业,然后执行了一些 checkpoint,然后停止作业,然后使用 1.10 从之前 1.9 的作业生成的
checkpoint 恢复,发现恢复不了?
2 你用作业 1.10 跑 wordcount,然后遇到特定的 word 会抛异常,然后 failover,发现不能从 checkpoint 恢复?

你这里的问题是第 1 种还是第 2 种呢?

另外能否分享一下你的操作步骤以及出错时候的 taskmanager log 呢?

Best,
Congxian


Peihui He  于2020年7月14日周二 下午2:46写道:

> Hi Congxian,
>
> 这个错误是从1.9 升级到1.10 遇到的问题。用简单的wordcount 测试,自己根据特定word
> 抛出runtimeException,就能够重现。flink on yarn 和 flink on k8s 都出现这个问题。1.10
> 都不能从上次的checkpoint状态中恢复。不知道是不是1.10需要其他配置呢?
>
> Best wishes.
>
> Congxian Qiu  于2020年7月14日周二 下午1:54写道:
>
> > Hi
> >
> > 这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢?
> > 另外你可以看下 tm log 看看有没有其他异常
> >
> > Best,
> > Congxian
> >
> >
> > Yun Tang  于2020年7月14日周二 上午11:57写道:
> >
> > > Hi Peihui
> > >
> > >
> >
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> > > cause。
> > >
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
> > >
> > >
> > > 祝好
> > > 唐云
> > > 
> > > From: Peihui He 
> > > Sent: Tuesday, July 14, 2020 10:42
> > > To: user-zh@flink.apache.org 
> > > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> > >
> > > hello,
> > >
> > > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> > >
> > >
> > > Caused by: java.nio.file.NoSuchFileException:
> > >
> > >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> > > ->
> > >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> > >
> > > 配置和1.9.2 一样:
> > > state.backend: rocksdb
> > > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> > > state.savepoints.dir: hdfs:///flink/savepoints/wc/
> > > state.backend.incremental: true
> > >
> > > 代码上都有
> > >
> > > env.enableCheckpointing(1);
> > >
> > >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> > > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> > >
> > >
> > >   是1.10.0 需要做什么特别配置么?
> > >
> >
>


Re: flink sql 1.10 insert into tb select 复杂schema 失败

2020-07-14 文章 Peihui He
Hi BenChao,

刚才尝试了flink 1.10.1 但是问题还是存在,看了

[1] https://issues.apache.org/jira/browse/FLINK-16628


这个bug fix没有我给的 table 复杂,

CREATE TABLE source_kafka_sasl (
svt STRING,
ops ROW
) WITH ()


我的是在原有的ops 里面又前嵌套了row。

Benchao Li  于2020年7月15日周三 上午10:25写道:

> Hi Peihui,
>
> 这是一个已知bug[1],已经在1.10.1和1.11.0中修复了,可以尝试下这两个版本。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16220
>
> Peihui He  于2020年7月15日周三 上午9:54写道:
>
> > Hello,
> >
> >  在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如
> > create table xxx (
> > a string,
> > b row(
> >  c row(d string)
> >   )
> > )
> >
> > 当c 中有值的时候,sql 如下 insert into select * from xxx会出现下面错误
> >
> > Caused by: java.lang.ClassCastException:
> >
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode
> > cannot be cast to
> >
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:337)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> > at
> >
> >
> org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:138)
> > ... 38 more
> >
> >
> > Best wishes.
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: pyflink1.11.0window

2020-07-14 文章 Shuiqiang Chen
举个sql例子
select platformcodetoname(payPlatform) as platform, sum(payAmount) as
pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as
rowtime
from payment_msg group by tumble(rt, interval '5' seconds), payPlatform
这个query 对每5s的tumble窗口做统计。

奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 上午11:10写道:

> Shuiqiang,你好:
>  我的目的是每间隔一段时间做一次汇总统计,比如每两秒做一下汇总,请问这个需求我改如何定义window?
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> acqua@gmail.com;
> 发送时间:2020年7月15日(星期三) 上午10:51
> 收件人:"user-zh"
> 主题:Re: pyflink1.11.0window
>
>
>
> 琴师你好,
> 异常栈信息org.apache.flink.table.api.ValidationException: A tumble window
> expects a size value literal.
> 看起来是接下tumble window定义的代码不太正确吧
>
> Best,
> Shuiqiang
>
> 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月15日周三 上午10:27写道:
>
>  你好:
>  nbsp; nbsp;
> 
> nbsp;我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。
>  Traceback (most recent call last):
>  nbsp; File "tou.py", line 71, in   nbsp; nbsp; from_kafka_to_kafka_demo()
>  nbsp; File "tou.py", line 21, in from_kafka_to_kafka_demo
>  nbsp; nbsp; .select(" id,nbsp; time1 , time1 ")\
>  nbsp; File
>  "/usr/local/lib/python3.7/site-packages/pyflink/table/table.py", line
> 907,
>  in select
>  nbsp; nbsp; return Table(self._j_table.select(fields),
> self._t_env)
>  nbsp; File
> "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py",
>  line 1286, in __call__
>  nbsp; nbsp; answer, self.gateway_client, self.target_id,
> self.name)
>  nbsp; File
>  "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line
>  147, in deco
>  nbsp; nbsp; return f(*a, **kw)
>  nbsp; File
> "/usr/local/lib/python3.7/site-packages/py4j/protocol.py",
>  line 328, in get_return_value
>  nbsp; nbsp; format(target_id, ".", name), value)
>  py4j.protocol.Py4JJavaError: An error occurred while calling
> o26.select.
>  : org.apache.flink.table.api.ValidationException: A tumble window
> expects
>  a size value literal.
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.operations.utils.AggregateOperationFactory.getAsValueLiteral(AggregateOperationFactory.java:384)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateAndCreateTumbleWindow(AggregateOperationFactory.java:302)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:236)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
>  nbsp; nbsp; nbsp; nbsp; at
>  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  nbsp; nbsp; nbsp; nbsp; at
>  java.lang.reflect.Method.invoke(Method.java:498)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>  nbsp; nbsp; nbsp; nbsp; at
> 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>  nbsp; nbsp; nbsp; nbsp; at
> java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>  def register_rides_source(st_env):
>  nbsp; nbsp; source_ddl = \
>  nbsp; nbsp; """
>  nbsp; nbsp; create table source1(
>  nbsp; nbsp; nbsp;id int,
>  nbsp; nbsp; nbsp;time1 timestamp,
>  nbsp; nbsp; nbsp;type string,
>  nbsp; nbsp; nbsp;WATERMARK FOR time1 as time1 -
> INTERVAL '2' SECOND
>  nbsp; nbsp; nbsp;) with (
>  nbsp; nbsp; 'connector.type' = 'kafka',
>  nbsp; nbsp; 'update-mode' = 'append',
>  nbsp; nbsp; 'connector.topic' = 'tp1',
>  nbsp; nbsp; 'connector.properties.bootstrap.servers' =
> 'localhost:9092',
>  nbsp; nbsp; 'connector.properties.zookeeper.connect' =
> 'localhost:2181',
>  nbsp; nbsp; 'format.type' = 'json',
>  nbsp; nbsp; 'format.derive-schema' = 'true',
>  nbsp; nbsp; 'connector.version' = 

Re: flink 1.11 upsert结果出错

2020-07-14 文章 Leonard Xu
Hi,

基本类似的,具体拼delete sql会根据 pk 来, 可以看下delete executor的代码[1]

祝好,
Leonard Xu
【1】https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/BufferReduceStatementExecutor.java#L89
 


> 在 2020年7月15日,11:05,小学生 <201782...@qq.com> 写道:
> 
> 嗯嗯,谢谢大佬的理解,还有一个问题,就是除了update,这个我看新性能也支持delete的,但是没找到相关的部分,delete这个是否类似:delete 
> table1 where score=1;烦请大佬帮忙解答下,不胜感激。



Re: flink1.11.0 java.lang.NoSuchMethodError:org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread

2020-07-14 文章 dmt312_2010
谢谢,确实是jar包冲突的问题,替换完包以后已经可以了


 原始邮件 
发件人: JasonLee<17610775...@163.com>
收件人: user-zh
发送时间: 2020年7月15日(周三) 11:04
主题: Re: flink1.11.0 
java.lang.NoSuchMethodError:org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread


这个很明显是jar包冲突了 只保留flink-sql那个包就行了 | | JasonLee | | 邮箱:17610775...@163.com | 
Signature is customized by Netease Mail Master On 07/15/2020 10:48, dmt312_2010 
wrote: Hi, 大家好,请教各位大佬一个问题,我在验证flink 1.11.0时,遇到如下问题: 报错信息: [ERROR] Could not 
execute SQL statement. Reason: java.lang.NoSuchMethodError: 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;
 
Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;
 
JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
 请问下是缺少某些需要的包吗? 环境信息: 版本:flink 1.11.0 启动方式:flink on yarn (集成到CDH中) Flink Home: 
/opt/cloudera/parcels/FLINK/lib/flink ${FLINK_HOME}/lib/ jar包信息: -rw-r--r-- 1 
root root 53820 Mar 9 2017 commons-cli-1.4.jar -rw-r--r-- 1 root root 284220 
Jan 17 2011 commons-lang-2.6.jar -rw-r--r-- 1 root root 61829 Jul 6 2014 
commons-logging-1.2.jar -rw-r--r-- 1 root root 197130 Jun 30 12:45 
flink-connector-jdbc_2.12-1.11.0.jar -rw-r--r-- 1 root root 47655 Jun 30 12:41 
flink-connector-kafka-0.10_2.12-1.11.0.jar -rw-r--r-- 1 root root 60151 Jun 30 
12:42 flink-connector-kafka-0.11_2.12-1.11.0.jar -rw-r--r-- 1 root root 109660 
Jun 30 12:46 flink-connector-kafka_2.12-1.11.0.jar -rw-r--r-- 1 root root 
122794 Jun 30 12:41 flink-connector-kafka-base_2.12-1.11.0.jar -rw-r--r-- 1 
root root 90782 Jun 30 16:40 flink-csv-1.11.0.jar -rw-r--r-- 1 root root 
99461460 Jun 30 16:46 flink-dist_2.12-1.11.0.jar -rw-r--r-- 1 root root 94863 
Jun 30 16:40 flink-json-1.11.0.jar -rw-rw-r-- 1 root root 19127 Jul 14 19:12 
flink-metrics-core-1.11.0.jar -rw-r--r-- 1 root root 108120 Jul 14 19:35 
flink-metrics-prometheus-1.11.0.jar -rw-r--r-- 1 root root 7712156 Jun 18 10:42 
flink-shaded-zookeeper-3.4.14.jar -rw-r--r-- 1 root root 31924588 Jun 30 16:45 
flink-table_2.12-1.11.0.jar -rw-r--r-- 1 root root 34817036 Jun 30 16:45 
flink-table-blink_2.12-1.11.0.jar -rw-r--r-- 1 root root 2740491 Oct 18 2019 
kafka-clients-2.3.1.jar -rw-r--r-- 1 root root 67114 Apr 20 20:47 
log4j-1.2-api-2.12.1.jar -rw-r--r-- 1 root root 276771 Apr 20 20:47 
log4j-api-2.12.1.jar -rw-r--r-- 1 root root 1674433 Apr 20 20:47 
log4j-core-2.12.1.jar -rw-r--r-- 1 root root 23518 Apr 20 20:47 
log4j-slf4j-impl-2.12.1.jar -rw-r--r-- 1 root root 1006904 Apr 20 11:10 
mysql-connector-java-5.1.49.jar -rw-r--r-- 1 root root 26084 Jul 14 18:54 
slf4j-api-1.7.5.jar -rw-r--r-- 1 root root 8869 Jul 14 18:54 
slf4j-log4j12-1.7.5.jar -rwxr-xr-x 1 root root 10680 Jul 14 18:54 
slf4j-simple-1.7.5.jar 执行语句 bin/sql-client.sh embedded CREATE TABLE 
TZT_PAYMENT_ORDER ( `op_type` STRING, `op_ts` STRING, `current_ts` STRING, 
`pos` STRING, `ID` BIGINT, `TRACE_ID` STRING, `BIZ_SYSTEM_CODE` STRING, 
`MERCHANT_NO` STRING, `REQUEST_NO` STRING, `PRODUCT_NAME` STRING, `BANK_CODE` 
STRING, `CREATE_TIME` STRING, `row_ts` as TO_TIMESTAMP(`CREATE_TIME`), 
WATERMARK FOR row_ts AS row_ts - INTERVAL '1' MINUTE ) WITH ( 'format' = 
'json', 'connector' = 'kafka', 'topic' = 'TZT_TZT_PAYMENT_ORDER', 
'properties.bootstrap.servers' = '', 'properties.group.id' = 
'TZT_TZT_PAYMENT_ORDER_TEST_WWX', 'scan.startup.mode' = 'earliest-offset' ); 
select * from TZT_PAYMENT_ORDER;

回复:TableEnvironment 里面无法执行 Hop等窗口函数

2020-07-14 文章 JasonLee
hi
滑动窗口只有在stream里面才会有


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月15日 09:19,王双利 写道:
TableEnvironment 里面无法执行 Hop等窗口函数,使用StreamTableEnvironment 则可以执行,Flink版本1.10
是否是这样的


?????? pyflink1.11.0window

2020-07-14 文章 ??????????????
Shuiqiang
 
??window??


----
??: 
   "user-zh"



Re: flink 1.11 upsert????????

2020-07-14 文章 ??????
update??delete??delete??delete
 table1 where score=1;??

Re: flink1.11.0 java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread

2020-07-14 文章 JasonLee
这个很明显是jar包冲突了 只保留flink-sql那个包就行了


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

On 07/15/2020 10:48, dmt312_2010 wrote:
Hi,
大家好,请教各位大佬一个问题,我在验证flink 1.11.0时,遇到如下问题:


报错信息:


[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;
Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;
JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V


请问下是缺少某些需要的包吗?




环境信息:


版本:flink 1.11.0
启动方式:flink on yarn (集成到CDH中)


Flink Home:
/opt/cloudera/parcels/FLINK/lib/flink


${FLINK_HOME}/lib/ jar包信息:


-rw-r--r-- 1 root   root  53820 Mar  9  2017 commons-cli-1.4.jar
-rw-r--r-- 1 root   root 284220 Jan 17  2011 commons-lang-2.6.jar
-rw-r--r-- 1 root   root  61829 Jul  6  2014 commons-logging-1.2.jar
-rw-r--r-- 1 root   root 197130 Jun 30 12:45 
flink-connector-jdbc_2.12-1.11.0.jar
-rw-r--r-- 1 root   root  47655 Jun 30 12:41 
flink-connector-kafka-0.10_2.12-1.11.0.jar
-rw-r--r-- 1 root   root  60151 Jun 30 12:42 
flink-connector-kafka-0.11_2.12-1.11.0.jar
-rw-r--r-- 1 root   root 109660 Jun 30 12:46 
flink-connector-kafka_2.12-1.11.0.jar
-rw-r--r-- 1 root   root 122794 Jun 30 12:41 
flink-connector-kafka-base_2.12-1.11.0.jar
-rw-r--r-- 1 root   root  90782 Jun 30 16:40 flink-csv-1.11.0.jar
-rw-r--r-- 1 root   root   99461460 Jun 30 16:46 flink-dist_2.12-1.11.0.jar
-rw-r--r-- 1 root   root  94863 Jun 30 16:40 flink-json-1.11.0.jar
-rw-rw-r-- 1 root   root19127 Jul 14 19:12 flink-metrics-core-1.11.0.jar
-rw-r--r-- 1 root   root 108120 Jul 14 19:35 
flink-metrics-prometheus-1.11.0.jar
-rw-r--r-- 1 root   root7712156 Jun 18 10:42 
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root   root   31924588 Jun 30 16:45 flink-table_2.12-1.11.0.jar
-rw-r--r-- 1 root   root   34817036 Jun 30 16:45 
flink-table-blink_2.12-1.11.0.jar
-rw-r--r-- 1 root   root2740491 Oct 18  2019 kafka-clients-2.3.1.jar
-rw-r--r-- 1 root   root  67114 Apr 20 20:47 log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 root   root 276771 Apr 20 20:47 log4j-api-2.12.1.jar
-rw-r--r-- 1 root   root1674433 Apr 20 20:47 log4j-core-2.12.1.jar
-rw-r--r-- 1 root   root  23518 Apr 20 20:47 log4j-slf4j-impl-2.12.1.jar
-rw-r--r-- 1 root   root1006904 Apr 20 11:10 mysql-connector-java-5.1.49.jar
-rw-r--r-- 1 root   root  26084 Jul 14 18:54 slf4j-api-1.7.5.jar
-rw-r--r-- 1 root   root   8869 Jul 14 18:54 slf4j-log4j12-1.7.5.jar
-rwxr-xr-x 1 root   root  10680 Jul 14 18:54 slf4j-simple-1.7.5.jar






执行语句


bin/sql-client.sh embedded




CREATE TABLE TZT_PAYMENT_ORDER (
 `op_type` STRING,
 `op_ts` STRING,
 `current_ts` STRING,
 `pos` STRING,
 `ID` BIGINT,
 `TRACE_ID` STRING,
 `BIZ_SYSTEM_CODE` STRING,
 `MERCHANT_NO` STRING,
 `REQUEST_NO` STRING,
 `PRODUCT_NAME` STRING,
 `BANK_CODE` STRING,
 `CREATE_TIME` STRING,
 `row_ts` as TO_TIMESTAMP(`CREATE_TIME`),
  WATERMARK FOR row_ts AS row_ts - INTERVAL '1' MINUTE
) WITH (
  'format' = 'json',
  'connector' = 'kafka',
  'topic' = 'TZT_TZT_PAYMENT_ORDER',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = 'TZT_TZT_PAYMENT_ORDER_TEST_WWX',
  'scan.startup.mode' = 'earliest-offset'
 );


select *  from  TZT_PAYMENT_ORDER;

Re: flink1.11.0 java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread

2020-07-14 文章 Leonard Xu
Hi,
两个kafka connector是会冲突的,还有一点是 SQL client 里应该用 sql 的connector依赖吧, 
flink-sql-connector-kafka_2.12-1.11.0.jar 

祝好,
Leonard Xu

> 在 2020年7月15日,10:56,Paul Lam  写道:
> 
> Hi,
> 
> 看起来是 Kafka connector class 冲突了,flink-connector-kafka_2.12-1.11.0.jar 和 
> flink-connector-kafka-0.10_2.12-1.11.0.jar 不能同时加到 classpath 里。
> 
> Best,
> Paul Lam
> 
>> 2020年7月15日 10:48,dmt312_2010  写道:
>> 
>> Hi,
>> 大家好,请教各位大佬一个问题,我在验证flink 1.11.0时,遇到如下问题:
>> 
>> 
>> 报错信息:
>> 
>> 
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.NoSuchMethodError: 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;
>> Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;
>> JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
>> 
>> 
>> 请问下是缺少某些需要的包吗?
>> 
>> 
>> 
>> 
>> 环境信息:
>> 
>> 
>> 版本:flink 1.11.0
>> 启动方式:flink on yarn (集成到CDH中)
>> 
>> 
>> Flink Home:
>> /opt/cloudera/parcels/FLINK/lib/flink
>> 
>> 
>> ${FLINK_HOME}/lib/ jar包信息:
>> 
>> 
>> -rw-r--r-- 1 root   root  53820 Mar  9  2017 commons-cli-1.4.jar
>> -rw-r--r-- 1 root   root 284220 Jan 17  2011 commons-lang-2.6.jar
>> -rw-r--r-- 1 root   root  61829 Jul  6  2014 commons-logging-1.2.jar
>> -rw-r--r-- 1 root   root 197130 Jun 30 12:45 
>> flink-connector-jdbc_2.12-1.11.0.jar
>> -rw-r--r-- 1 root   root  47655 Jun 30 12:41 
>> flink-connector-kafka-0.10_2.12-1.11.0.jar
>> -rw-r--r-- 1 root   root  60151 Jun 30 12:42 
>> flink-connector-kafka-0.11_2.12-1.11.0.jar
>> -rw-r--r-- 1 root   root 109660 Jun 30 12:46 
>> flink-connector-kafka_2.12-1.11.0.jar
>> -rw-r--r-- 1 root   root 122794 Jun 30 12:41 
>> flink-connector-kafka-base_2.12-1.11.0.jar
>> -rw-r--r-- 1 root   root  90782 Jun 30 16:40 flink-csv-1.11.0.jar
>> -rw-r--r-- 1 root   root   99461460 Jun 30 16:46 flink-dist_2.12-1.11.0.jar
>> -rw-r--r-- 1 root   root  94863 Jun 30 16:40 flink-json-1.11.0.jar
>> -rw-rw-r-- 1 root   root19127 Jul 14 19:12 flink-metrics-core-1.11.0.jar
>> -rw-r--r-- 1 root   root 108120 Jul 14 19:35 
>> flink-metrics-prometheus-1.11.0.jar
>> -rw-r--r-- 1 root   root7712156 Jun 18 10:42 
>> flink-shaded-zookeeper-3.4.14.jar
>> -rw-r--r-- 1 root   root   31924588 Jun 30 16:45 flink-table_2.12-1.11.0.jar
>> -rw-r--r-- 1 root   root   34817036 Jun 30 16:45 
>> flink-table-blink_2.12-1.11.0.jar
>> -rw-r--r-- 1 root   root2740491 Oct 18  2019 kafka-clients-2.3.1.jar
>> -rw-r--r-- 1 root   root  67114 Apr 20 20:47 log4j-1.2-api-2.12.1.jar
>> -rw-r--r-- 1 root   root 276771 Apr 20 20:47 log4j-api-2.12.1.jar
>> -rw-r--r-- 1 root   root1674433 Apr 20 20:47 log4j-core-2.12.1.jar
>> -rw-r--r-- 1 root   root  23518 Apr 20 20:47 log4j-slf4j-impl-2.12.1.jar
>> -rw-r--r-- 1 root   root1006904 Apr 20 11:10 
>> mysql-connector-java-5.1.49.jar
>> -rw-r--r-- 1 root   root  26084 Jul 14 18:54 slf4j-api-1.7.5.jar
>> -rw-r--r-- 1 root   root   8869 Jul 14 18:54 slf4j-log4j12-1.7.5.jar
>> -rwxr-xr-x 1 root   root  10680 Jul 14 18:54 slf4j-simple-1.7.5.jar
>> 
>> 
>> 
>> 
>> 
>> 
>> 执行语句
>> 
>> 
>> bin/sql-client.sh embedded
>> 
>> 
>> 
>> 
>> CREATE TABLE TZT_PAYMENT_ORDER (
>> `op_type` STRING,
>> `op_ts` STRING,
>> `current_ts` STRING,
>> `pos` STRING, 
>> `ID` BIGINT,
>> `TRACE_ID` STRING,
>> `BIZ_SYSTEM_CODE` STRING,
>> `MERCHANT_NO` STRING,
>> `REQUEST_NO` STRING,
>> `PRODUCT_NAME` STRING,
>> `BANK_CODE` STRING,
>> `CREATE_TIME` STRING,
>> `row_ts` as TO_TIMESTAMP(`CREATE_TIME`),
>>  WATERMARK FOR row_ts AS row_ts - INTERVAL '1' MINUTE
>> ) WITH (
>>  'format' = 'json',
>>  'connector' = 'kafka',
>>  'topic' = 'TZT_TZT_PAYMENT_ORDER',
>>  'properties.bootstrap.servers' = '',
>>  'properties.group.id' = 'TZT_TZT_PAYMENT_ORDER_TEST_WWX',
>>  'scan.startup.mode' = 'earliest-offset'
>> );
>> 
>> 
>> select *  from  TZT_PAYMENT_ORDER;
> 



Re: flink1.11.0 java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread

2020-07-14 文章 Paul Lam
Hi,

看起来是 Kafka connector class 冲突了,flink-connector-kafka_2.12-1.11.0.jar 和 
flink-connector-kafka-0.10_2.12-1.11.0.jar 不能同时加到 classpath 里。

Best,
Paul Lam

> 2020年7月15日 10:48,dmt312_2010  写道:
> 
> Hi,
> 大家好,请教各位大佬一个问题,我在验证flink 1.11.0时,遇到如下问题:
> 
> 
> 报错信息:
> 
> 
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;
> Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;
> JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
> 
> 
> 请问下是缺少某些需要的包吗?
> 
> 
> 
> 
> 环境信息:
> 
> 
> 版本:flink 1.11.0
> 启动方式:flink on yarn (集成到CDH中)
> 
> 
> Flink Home:
> /opt/cloudera/parcels/FLINK/lib/flink
> 
> 
> ${FLINK_HOME}/lib/ jar包信息:
> 
> 
> -rw-r--r-- 1 root   root  53820 Mar  9  2017 commons-cli-1.4.jar
> -rw-r--r-- 1 root   root 284220 Jan 17  2011 commons-lang-2.6.jar
> -rw-r--r-- 1 root   root  61829 Jul  6  2014 commons-logging-1.2.jar
> -rw-r--r-- 1 root   root 197130 Jun 30 12:45 
> flink-connector-jdbc_2.12-1.11.0.jar
> -rw-r--r-- 1 root   root  47655 Jun 30 12:41 
> flink-connector-kafka-0.10_2.12-1.11.0.jar
> -rw-r--r-- 1 root   root  60151 Jun 30 12:42 
> flink-connector-kafka-0.11_2.12-1.11.0.jar
> -rw-r--r-- 1 root   root 109660 Jun 30 12:46 
> flink-connector-kafka_2.12-1.11.0.jar
> -rw-r--r-- 1 root   root 122794 Jun 30 12:41 
> flink-connector-kafka-base_2.12-1.11.0.jar
> -rw-r--r-- 1 root   root  90782 Jun 30 16:40 flink-csv-1.11.0.jar
> -rw-r--r-- 1 root   root   99461460 Jun 30 16:46 flink-dist_2.12-1.11.0.jar
> -rw-r--r-- 1 root   root  94863 Jun 30 16:40 flink-json-1.11.0.jar
> -rw-rw-r-- 1 root   root19127 Jul 14 19:12 flink-metrics-core-1.11.0.jar
> -rw-r--r-- 1 root   root 108120 Jul 14 19:35 
> flink-metrics-prometheus-1.11.0.jar
> -rw-r--r-- 1 root   root7712156 Jun 18 10:42 
> flink-shaded-zookeeper-3.4.14.jar
> -rw-r--r-- 1 root   root   31924588 Jun 30 16:45 flink-table_2.12-1.11.0.jar
> -rw-r--r-- 1 root   root   34817036 Jun 30 16:45 
> flink-table-blink_2.12-1.11.0.jar
> -rw-r--r-- 1 root   root2740491 Oct 18  2019 kafka-clients-2.3.1.jar
> -rw-r--r-- 1 root   root  67114 Apr 20 20:47 log4j-1.2-api-2.12.1.jar
> -rw-r--r-- 1 root   root 276771 Apr 20 20:47 log4j-api-2.12.1.jar
> -rw-r--r-- 1 root   root1674433 Apr 20 20:47 log4j-core-2.12.1.jar
> -rw-r--r-- 1 root   root  23518 Apr 20 20:47 log4j-slf4j-impl-2.12.1.jar
> -rw-r--r-- 1 root   root1006904 Apr 20 11:10 
> mysql-connector-java-5.1.49.jar
> -rw-r--r-- 1 root   root  26084 Jul 14 18:54 slf4j-api-1.7.5.jar
> -rw-r--r-- 1 root   root   8869 Jul 14 18:54 slf4j-log4j12-1.7.5.jar
> -rwxr-xr-x 1 root   root  10680 Jul 14 18:54 slf4j-simple-1.7.5.jar
> 
> 
> 
> 
> 
> 
> 执行语句
> 
> 
> bin/sql-client.sh embedded
> 
> 
> 
> 
> CREATE TABLE TZT_PAYMENT_ORDER (
>  `op_type` STRING,
>  `op_ts` STRING,
>  `current_ts` STRING,
>  `pos` STRING, 
>  `ID` BIGINT,
>  `TRACE_ID` STRING,
>  `BIZ_SYSTEM_CODE` STRING,
>  `MERCHANT_NO` STRING,
>  `REQUEST_NO` STRING,
>  `PRODUCT_NAME` STRING,
>  `BANK_CODE` STRING,
>  `CREATE_TIME` STRING,
>  `row_ts` as TO_TIMESTAMP(`CREATE_TIME`),
>   WATERMARK FOR row_ts AS row_ts - INTERVAL '1' MINUTE
> ) WITH (
>   'format' = 'json',
>   'connector' = 'kafka',
>   'topic' = 'TZT_TZT_PAYMENT_ORDER',
>   'properties.bootstrap.servers' = '',
>   'properties.group.id' = 'TZT_TZT_PAYMENT_ORDER_TEST_WWX',
>   'scan.startup.mode' = 'earliest-offset'
>  );
> 
> 
> select *  from  TZT_PAYMENT_ORDER;



Re: pyflink1.11.0window

2020-07-14 文章 Shuiqiang Chen
琴师你好,
异常栈信息org.apache.flink.table.api.ValidationException: A tumble window
expects a size value literal.
看起来是接下tumble window定义的代码不太正确吧

Best,
Shuiqiang

奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 上午10:27写道:

> 你好:
>  
> 我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。
> Traceback (most recent call last):
>  File "tou.py", line 71, infrom_kafka_to_kafka_demo()
>  File "tou.py", line 21, in from_kafka_to_kafka_demo
>   .select(" id, time1 , time1 ")\
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table.py", line 907,
> in select
>   return Table(self._j_table.select(fields), self._t_env)
>  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>   answer, self.gateway_client, self.target_id, self.name)
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line
> 147, in deco
>   return f(*a, **kw)
>  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
>   format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o26.select.
> : org.apache.flink.table.api.ValidationException: A tumble window expects
> a size value literal.
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.getAsValueLiteral(AggregateOperationFactory.java:384)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateAndCreateTumbleWindow(AggregateOperationFactory.java:302)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:236)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at
> java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> def register_rides_source(st_env):
>   source_ddl = \
>   """
>   create table source1(
>   id int,
>   time1 timestamp,
>   type string,
>   WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND
>   ) with (
>   'connector.type' = 'kafka',
>   'update-mode' = 'append',
>   'connector.topic' = 'tp1',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'format.type' = 'json',
>   'format.derive-schema' = 'true',
>   'connector.version' = 'universal'
>   )
>   """
>   st_env.sql_update(source_ddl)
>
>  
>   s_env =
> StreamExecutionEnvironment.get_execution_environment()
>   s_env.set_parallelism(1)
>
>
>   st_env = StreamTableEnvironment.create(s_env)
>
>
>   register_rides_source(st_env)
>   register_rides_sink(st_env)
>
>
>   st_env.from_path("source1")\
>
> .window(Tumble.over("2.secends").on("time1").alias("w")) \
> .group_by("w") \
> .select(" id, time1 , time1 ")\
> .insert_into("sink1")
>  
>   st_env.execute("2-from_kafka_to_kafka")
>
>
> 代码如上
>
>
>
>
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> acqua@gmail.com;
> 发送时间:2020年7月10日(星期五) 上午9:17
> 收件人:"user-zh"
> 主题:Re: pyflink1.11.0window
>
>
>
> 琴师你好,
>
> 你的source ddl里有指定time1为 time attribute吗?
> create table source1(
>  id int,
>  time1 timestamp,
>  type string,
>  WATERMARK FOR time1 as time1 -
> INTERVAL '2' SECOND
> ) with (...)
>
> 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月10日周五 上午8:43写道:
>
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "奇怪的不朽琴师"
> 
> <
>  1129656...@qq.comgt;;
>  发送时间:nbsp;2020年7月9日(星期四) 下午5:08
>  收件人:nbsp;"godfrey he" 
>  主题:nbsp;pyflink1.11.0window
> 
> 
> 
>  你好:
>  nbsp; nbsp;我在使用pyflink1.11版本时,window开窗仍会报错
>  : org.apache.flink.table.api.ValidationException: A group window
> expects a
>  time attribute for grouping in a stream 

flink1.11.0 java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread

2020-07-14 文章 dmt312_2010
Hi,
大家好,请教各位大佬一个问题,我在验证flink 1.11.0时,遇到如下问题:


报错信息:


[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;
Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;
JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V


请问下是缺少某些需要的包吗?




环境信息:


版本:flink 1.11.0
启动方式:flink on yarn (集成到CDH中)


Flink Home:
/opt/cloudera/parcels/FLINK/lib/flink


${FLINK_HOME}/lib/ jar包信息:


-rw-r--r-- 1 root   root  53820 Mar  9  2017 commons-cli-1.4.jar
-rw-r--r-- 1 root   root 284220 Jan 17  2011 commons-lang-2.6.jar
-rw-r--r-- 1 root   root  61829 Jul  6  2014 commons-logging-1.2.jar
-rw-r--r-- 1 root   root 197130 Jun 30 12:45 
flink-connector-jdbc_2.12-1.11.0.jar
-rw-r--r-- 1 root   root  47655 Jun 30 12:41 
flink-connector-kafka-0.10_2.12-1.11.0.jar
-rw-r--r-- 1 root   root  60151 Jun 30 12:42 
flink-connector-kafka-0.11_2.12-1.11.0.jar
-rw-r--r-- 1 root   root 109660 Jun 30 12:46 
flink-connector-kafka_2.12-1.11.0.jar
-rw-r--r-- 1 root   root 122794 Jun 30 12:41 
flink-connector-kafka-base_2.12-1.11.0.jar
-rw-r--r-- 1 root   root  90782 Jun 30 16:40 flink-csv-1.11.0.jar
-rw-r--r-- 1 root   root   99461460 Jun 30 16:46 flink-dist_2.12-1.11.0.jar
-rw-r--r-- 1 root   root  94863 Jun 30 16:40 flink-json-1.11.0.jar
-rw-rw-r-- 1 root   root19127 Jul 14 19:12 flink-metrics-core-1.11.0.jar
-rw-r--r-- 1 root   root 108120 Jul 14 19:35 
flink-metrics-prometheus-1.11.0.jar
-rw-r--r-- 1 root   root7712156 Jun 18 10:42 
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root   root   31924588 Jun 30 16:45 flink-table_2.12-1.11.0.jar
-rw-r--r-- 1 root   root   34817036 Jun 30 16:45 
flink-table-blink_2.12-1.11.0.jar
-rw-r--r-- 1 root   root2740491 Oct 18  2019 kafka-clients-2.3.1.jar
-rw-r--r-- 1 root   root  67114 Apr 20 20:47 log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 root   root 276771 Apr 20 20:47 log4j-api-2.12.1.jar
-rw-r--r-- 1 root   root1674433 Apr 20 20:47 log4j-core-2.12.1.jar
-rw-r--r-- 1 root   root  23518 Apr 20 20:47 log4j-slf4j-impl-2.12.1.jar
-rw-r--r-- 1 root   root1006904 Apr 20 11:10 mysql-connector-java-5.1.49.jar
-rw-r--r-- 1 root   root  26084 Jul 14 18:54 slf4j-api-1.7.5.jar
-rw-r--r-- 1 root   root   8869 Jul 14 18:54 slf4j-log4j12-1.7.5.jar
-rwxr-xr-x 1 root   root  10680 Jul 14 18:54 slf4j-simple-1.7.5.jar






执行语句


bin/sql-client.sh embedded




CREATE TABLE TZT_PAYMENT_ORDER (
  `op_type` STRING,
  `op_ts` STRING,
  `current_ts` STRING,
  `pos` STRING, 
  `ID` BIGINT,
  `TRACE_ID` STRING,
  `BIZ_SYSTEM_CODE` STRING,
  `MERCHANT_NO` STRING,
  `REQUEST_NO` STRING,
  `PRODUCT_NAME` STRING,
  `BANK_CODE` STRING,
  `CREATE_TIME` STRING,
  `row_ts` as TO_TIMESTAMP(`CREATE_TIME`),
   WATERMARK FOR row_ts AS row_ts - INTERVAL '1' MINUTE
) WITH (
   'format' = 'json',
   'connector' = 'kafka',
   'topic' = 'TZT_TZT_PAYMENT_ORDER',
   'properties.bootstrap.servers' = '',
   'properties.group.id' = 'TZT_TZT_PAYMENT_ORDER_TEST_WWX',
   'scan.startup.mode' = 'earliest-offset'
  );


select *  from  TZT_PAYMENT_ORDER;

????????Flink Hadoop????????

2020-07-14 文章 Z-Z
Flink 1.11.0docker-compose??docker-compose??
version: "2.1"
services:
 jobmanager:
  image: flink:1.11.0-scala_2.12
  expose:
   - "6123"
  ports:
   - "8081:8081"
  command: jobmanager
  environment:
   - JOB_MANAGER_RPC_ADDRESS=jobmanager
   - 
HADOOP_CLASSPATH=/data/hadoop-2.9.2/etc/hadoop:/data/hadoop-2.9.2/share/hadoop/common/lib/*:/data/hadoop-2.9.2/share/hadoop/common/*:/data/hadoop-2.9.2/share/hadoop/hdfs:/data/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/data/hadoop-2.9.2/share/hadoop/hdfs/*:/data/hadoop-2.9.2/share/hadoop/yarn:/data/hadoop-2.9.2/share/hadoop/yarn/lib/*:/data/hadoop-2.9.2/share/hadoop/yarn/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar
  volumes:
   - ./jobmanager/conf:/opt/flink/conf
   - ./data:/data


 taskmanager:
  image: flink:1.11.0-scala_2.12
  expose:
   - "6121"
   - "6122"
  depends_on:
   - jobmanager
  command: taskmanager
  links:
   - "jobmanager:jobmanager"
  environment:
   - JOB_MANAGER_RPC_ADDRESS=jobmanager
  volumes:
   - ./taskmanager/conf:/opt/flink/conf
networks:
 default:
  external:
   name: flink-network



hadoop-2.9.2datajobmanager??taskmanager??HADOOP_CLASSPATHcli??webui??jobmanager??Could
 not find a file system implementation for scheme 'hdfs'

Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-14 文章 Jark Wu
Hi,

请问用的是什么版本,blink planner 还是 old planner?
有没有简化一点的 query 也能复现这个 ttl 问题的? 比如一层 groupby?

Best,
Jark

On Tue, 14 Jul 2020 at 15:36, Robin Zhang 
wrote:

> 
> 我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
> 代码如下:
>tEnv.getConfig()
>  .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),
>
> Time.hours(maxIdleStateRetentionTime));
>
> 程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

2020-07-14 文章 Jark Wu
Hi,

从异常来看,应该是少了如下这一行:

   outputFormatStatus =
JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
   .setJdbcOptions(jdbcOptions)
   .setFieldDataTypes(fieldDataTypes)
   .setJdbcDmlOptions(dmlOptions)

 .setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
   .setRowDataTypeInfo(rowDataTypeInfo) // 少了这一行
   .build();

顺便提醒下, `RowDataTypeInfo` 和 JdbcRowDataOutputFormat
都是内部类,不保证跨版本的兼容性(其实,在下个版本,这两个类都被重构了)。

Best,
Jark


On Tue, 14 Jul 2020 at 17:38, jindy_liu <286729...@qq.com> wrote:

>
> 代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化?
>
> 麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗?
>
>
> at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at
>
> org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198)
> at
>
> org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94)
> at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
> at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
> at
>
> org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103)
> at
>
> com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132)
> at
>
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
>
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
>
> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> 代码=>
>
> public class PostgresSinkMapFunction extends RichFlatMapFunction String> {
> private static String driverClass = "org.postgresql.Driver";
> private static String dbUrl =
> "jdbc:postgresql://localhost:5432/ai_audio_lyric_task";
> private static String userNmae = "postgres";
> private static String passWord = "1";
>
> // 表status
> private static JdbcRowDataOutputFormat outputFormatStatus;
> private static String[] fieldNames = new String[] {"id", "name"};
> private static DataType[] fieldDataTypes = new DataType[]{
> DataTypes.INT(),
> DataTypes.STRING()};
>
> private static RowType rowType = RowType.of(
> Arrays.stream(fieldDataTypes)
> .map(DataType::getLogicalType)
> .toArray(LogicalType[]::new),
> fieldNames);
> private static RowDataTypeInfo rowDataTypeInfo =
> RowDataTypeInfo.of(rowType);
>
> @Override
> public void flatMap(String s, Collector collector) throws
> Exception {
> GenericRowData row = new GenericRowData(2);
>
>  row.setRowKind(INSERT);
>  row.setField(0, count);
>  row.setField(1, "jindy" + Integer.toString(count));
>
> outputFormatStatus.writeRecord(row);
>
> }
>
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
>
> JdbcOptions jdbcOptions = JdbcOptions.builder()
> .setDriverName(driverClass)
> .setDBUrl(dbUrl)
> .setTableName("status_mirror")
> .setUsername(userNmae)
> .setPassword(passWord)
> .build();
>
> JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
> .withTableName(jdbcOptions.getTableName())
> .withDialect(jdbcOptions.getDialect())
> .withFieldNames(fieldNames)
> .build();
>
> outputFormatStatus =
> JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
> .setJdbcOptions(jdbcOptions)
> .setFieldDataTypes(fieldDataTypes)

?????? pyflink1.11.0window

2020-07-14 文章 ??????????????
??
  
??source??window??
Traceback (most recent call last):
 File "tou.py", line 71, in 

Re: flink sql 1.10 insert into tb select 复杂schema 失败

2020-07-14 文章 Benchao Li
Hi Peihui,

这是一个已知bug[1],已经在1.10.1和1.11.0中修复了,可以尝试下这两个版本。

[1] https://issues.apache.org/jira/browse/FLINK-16220

Peihui He  于2020年7月15日周三 上午9:54写道:

> Hello,
>
>  在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如
> create table xxx (
> a string,
> b row(
>  c row(d string)
>   )
> )
>
> 当c 中有值的时候,sql 如下 insert into select * from xxx会出现下面错误
>
> Caused by: java.lang.ClassCastException:
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode
> cannot be cast to
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> at
>
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:337)
> at
>
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> at
>
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
> at
>
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> at
>
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
> at
>
> org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
> at
>
> org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:138)
> ... 38 more
>
>
> Best wishes.
>


-- 

Best,
Benchao Li


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-14 文章 Benchao Li
就是现在Flink json已经有了对于VARBINARY类型的处理逻辑,就是string和byte[]互转,然后还需要有base64编码。

但是我们是想让对于VARBINARY的处理逻辑变成另外一种形式,就是把JsonNode直接toString,获取这个json子树的
字符串表示,然后再转成byte[]来作为这个字段。输出的时候,也会直接通过这个byte[]数据来构造一个JsonNode树,
然后放到对应的位置上。也就做到了一个json节点原封不动的保留到了输出里面,不管它是一个什么类型的json节点。

Peihui He  于2020年7月15日周三 上午9:59写道:

> Hi BenChao,
>
> 请问第2个解决思路中 额外加一个选项是指什么呢?
>
> Best wishes.
>
> Benchao Li  于2020年7月10日周五 下午1:54写道:
>
> > Hi Peihui,
> >
> > 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。
> >
> > 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
> > 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
> > 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以
> > 从结果上来看,*还不能完全做到原封不动的输出到下游*。
> >
> > 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题:
> > 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的
> > 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型
> >   的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在
> >   序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。
> >
> > Jark Wu  于2020年7月10日周五 下午12:22写道:
> >
> > > 社区有个 issue 正在解决这个问题,可以关注一下
> > > https://issues.apache.org/jira/browse/FLINK-18002
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:
> > >
> > > > Hi, Peihui
> > > >
> > > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format
> > > 的解析的底层实现
> > > > 就是按照json的标准格式解析(jackson)的,没法将一个
> > > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> > > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
> > > >
> > > > 一种做法是定义复杂的jsonObject对应的ROW
> > > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> > > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> > > > 然后query里用UDTF处理。
> > > >
> > > >
> > > > 祝好
> > > > Leonard Xu
> > > >
> > > >
> > > >
> > > >
> > > > > 在 2020年7月10日,10:16,Peihui He  写道:
> > > > >
> > > > > Hello,
> > > > >
> > > > >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> > > > >
> > > > > Best wishes.
> > > > >
> > > > > Peihui He  于2020年7月10日周五 上午10:12写道:
> > > > >
> > > > >> Hello,
> > > > >>
> > > > >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> > > > >>
> > > > >>
> > > > >> Best wishes.
> > > > >>
> > > > >> LakeShen  于2020年7月10日周五 上午10:03写道:
> > > > >>
> > > > >>> Hi Peihui,
> > > > >>>
> > > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> > > > >>>
> > > > >>> {
> > > > >>>"a":"b",
> > > > >>>"c":{
> > > > >>>"d":"e",
> > > > >>>"g":"f"
> > > > >>>}
> > > > >>> },
> > > > >>>
> > > > >>> 那么在 kafka table source 可以使用 row 来定义:
> > > > >>>
> > > > >>> create table xxx (
> > > > >>> a varchar,
> > > > >>> c row
> > > > >>> )
> > > > >>>
> > > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> > > > >>>
> > > > >>> Best,
> > > > >>> LakeShen
> > > > >>>
> > > > >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> > > > >>>
> > > >  Hello:
> > > > 
> > > > 在用flink
> > > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> > > > 
> > > >  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> > > > 
> > > > 
> > > >  Best wishes.
> > > > 
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-14 文章 Peihui He
Hi BenChao,

请问第2个解决思路中 额外加一个选项是指什么呢?

Best wishes.

Benchao Li  于2020年7月10日周五 下午1:54写道:

> Hi Peihui,
>
> 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。
>
> 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
> 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
> 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以
> 从结果上来看,*还不能完全做到原封不动的输出到下游*。
>
> 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题:
> 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的
> 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型
>   的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在
>   序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。
>
> Jark Wu  于2020年7月10日周五 下午12:22写道:
>
> > 社区有个 issue 正在解决这个问题,可以关注一下
> > https://issues.apache.org/jira/browse/FLINK-18002
> >
> > Best,
> > Jark
> >
> > On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:
> >
> > > Hi, Peihui
> > >
> > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format
> > 的解析的底层实现
> > > 就是按照json的标准格式解析(jackson)的,没法将一个
> > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
> > >
> > > 一种做法是定义复杂的jsonObject对应的ROW
> > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> > > 然后query里用UDTF处理。
> > >
> > >
> > > 祝好
> > > Leonard Xu
> > >
> > >
> > >
> > >
> > > > 在 2020年7月10日,10:16,Peihui He  写道:
> > > >
> > > > Hello,
> > > >
> > > >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> > > >
> > > > Best wishes.
> > > >
> > > > Peihui He  于2020年7月10日周五 上午10:12写道:
> > > >
> > > >> Hello,
> > > >>
> > > >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> > > >>
> > > >>
> > > >> Best wishes.
> > > >>
> > > >> LakeShen  于2020年7月10日周五 上午10:03写道:
> > > >>
> > > >>> Hi Peihui,
> > > >>>
> > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> > > >>>
> > > >>> {
> > > >>>"a":"b",
> > > >>>"c":{
> > > >>>"d":"e",
> > > >>>"g":"f"
> > > >>>}
> > > >>> },
> > > >>>
> > > >>> 那么在 kafka table source 可以使用 row 来定义:
> > > >>>
> > > >>> create table xxx (
> > > >>> a varchar,
> > > >>> c row
> > > >>> )
> > > >>>
> > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> > > >>>
> > > >>> Best,
> > > >>> LakeShen
> > > >>>
> > > >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> > > >>>
> > >  Hello:
> > > 
> > > 在用flink
> > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> > > 
> > >  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> > > 
> > > 
> > >  Best wishes.
> > > 
> > > >>>
> > > >>
> > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: 滑动窗口数据存储多份问题

2020-07-14 文章 Tianwang Li
| 为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
可以使用blink 的SQL,是通过pane 实现的,输出的时候才合并每个pane。参考`PanedWindowAssigner`


张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道:

> Hi,all!
> 由于第一次咨询,我不确定上一份邮件大家是否收到。
> 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> 份?
>
>
> | |
> 张浩
> |
> |
> 13669299...@163.com
> |
> 签名由网易邮箱大师定制



-- 
**
 tivanli
**


flink sql 1.10 insert into tb select 复杂schema 失败

2020-07-14 文章 Peihui He
Hello,

 在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如
create table xxx (
a string,
b row(
 c row(d string)
  )
)

当c 中有值的时候,sql 如下 insert into select * from xxx会出现下面错误

Caused by: java.lang.ClassCastException:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode
cannot be cast to
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at
org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:337)
at
org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
at
org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
at
org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
at
org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
at
org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
at
org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:138)
... 38 more


Best wishes.


TableEnvironment 里面无法执行 Hop等窗口函数

2020-07-14 文章 王双利
TableEnvironment 里面无法执行 Hop等窗口函数,使用StreamTableEnvironment 则可以执行,Flink版本1.10
是否是这样的


回复:flink-1.11 DDL 设置chk目录问题

2020-07-14 文章 kcz
谢谢 我一直用的是 streamEnv去设置config 今天看到table也可以,如果我用stream去设置 也是可以的吧





-- 原始邮件 --
发件人: Leonard Xu 

Re:Re: flink 双流join报错,java.lang.AssertionError

2020-07-14 文章 sunfulin
hi, 
 @Danny Chan 
我在1.10版本中确实触发到了这个bug,切到1.11版本貌似就没这问题了。简单解释下问题:双流join的case,右边流join后的结果字段在获取时貌似乱序了。


















在 2020-07-13 10:42:12,"Jark Wu"  写道:
>cc @Danny Chan   也许 Danny 老师知道。
>
>On Thu, 9 Jul 2020 at 17:29, sunfulin  wrote:
>
>>
>> hi,
>> 我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。
>>
>>
>>
>>
>>
>>
>> 在 2020-07-09 16:53:34,"sunfulin"  写道:
>> >hi,
>> >我使用flink 1.10.1 
>> >blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select
>> > 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?
>> >
>> >
>> >select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, 
>> >A.fund_name as product_name, cast(B.balance as double) as balance
>> >from (
>> >select toLong(behaviorTime, true) as recvTime, user_id,
>> >cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
>> >regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
>> >regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime 
>> >from kafka_zl_etrack_event_stream
>> >where pageId = ''
>> >and eventId = 'click'
>> >and btnId = '
>> >and CHARACTER_LENGTH(user_id) > 4
>> >) A
>> >left join
>> >(
>> >select customerNumber, balance, fundCode, lastUpdateTime, proctime
>> >  from lscsp_sc_order_all
>> >   where `status` = '4'
>> > and businessType IN ('4','5','14','16','17','18')
>> > and fundCode IS NOT NULL
>> > and balance IS NOT NULL
>> > and lastUpdateTime IS NOT NULL
>> >) B
>> >on A.user_id = B.customerNumber and A.fund_code = B.fundCode
>> >group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, 
>> >cast(B.balance as double)
>> >
>> >
>> >
>> >
>> >
>> >
>> >Exception in thread "main" java.lang.AssertionError
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>> >at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>> >at 
>> >org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>> >at 
>> >org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
>> >at 
>> >org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)
>>
>>
>>
>>
>>


Re: 自定义的sql connector在sql-cli中运行问题

2020-07-14 文章 admin
解决了,原因是我同时实现了createTableSink和createStreamTableSink导致
删掉createTableSink就可以了


> 2020年7月14日 上午10:50,admin <17626017...@163.com> 写道:
> 
> hi all,
> 我自定义了一个sql 
> connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下
> 2020-07-14 10:36:29,148 WARN  org.apache.flink.table.client.cli.CliClient 
>  [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL 
> update statement.
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) 
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) 
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251]
>at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> Caused by: scala.MatchError: null
>at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.Option.map(Option.scala:146) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341)
>  ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> 

Re:Re: flink1.11 sink hive error

2020-07-14 文章 Zhou Zach
Hi,


我刚才把flink sink的hive table,hive hdfs目录都删了,hbase表数据也清空了(hbase 通过hue hive 
table方式查询),然后重启程序,就可以了,
等再出问题,我试下你这种方法,感谢答疑!

















在 2020-07-14 20:42:16,"Leonard Xu"  写道:
>Hi,
>你安装 hive 的 metastore 后,在你 hivehome/conf/hive-site.xml 文件中添加这样一个配置:
>  
>hive.metastore.uris
>thrift://:9083
>Thrift URI for the remote metastore. Used by metastore client 
> to connect to remote metastore.
>  
>一般生产环境应该也是这样配置,
>然后 Flink 对接到hive配置参考[1],应该和你之前用的没啥变化,就是不支持 embedded 的 metastore
>
>祝好,
>Leonard Xu
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
> 
>
>
>> 在 2020年7月14日,20:29,Zhou Zach  写道:
>> 
>> Hi,
>> 
>> 
>> 是在flink的conf文件配置hive.metastore.uris吗
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-07-14 20:03:11,"Leonard Xu"  写道:
>>> Hello
>>> 
>>> 
 在 2020年7月14日,19:52,Zhou Zach  写道:
 
 : Embedded metastore is not allowed.
>>> 
>>> Flink 集成 Hive 时,不支持 embedded metastore 的, 你需要起一个hive metastore 并在conf文件配置 
>>> hive.metastore.uris, 支持的 metastore 版本 参考[1]
>>> 
>>> Best,
>>> Leonard Xu
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#using-bundled-hive-jar
>>>  
>>> 
>


Re: flink1.11 sink hive error

2020-07-14 文章 Leonard Xu
Hi,
你安装 hive 的 metastore 后,在你 hivehome/conf/hive-site.xml 文件中添加这样一个配置:
  
hive.metastore.uris
thrift://:9083
Thrift URI for the remote metastore. Used by metastore client 
to connect to remote metastore.
  
一般生产环境应该也是这样配置,
然后 Flink 对接到hive配置参考[1],应该和你之前用的没啥变化,就是不支持 embedded 的 metastore

祝好,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
 


> 在 2020年7月14日,20:29,Zhou Zach  写道:
> 
> Hi,
> 
> 
> 是在flink的conf文件配置hive.metastore.uris吗
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-14 20:03:11,"Leonard Xu"  写道:
>> Hello
>> 
>> 
>>> 在 2020年7月14日,19:52,Zhou Zach  写道:
>>> 
>>> : Embedded metastore is not allowed.
>> 
>> Flink 集成 Hive 时,不支持 embedded metastore 的, 你需要起一个hive metastore 并在conf文件配置 
>> hive.metastore.uris, 支持的 metastore 版本 参考[1]
>> 
>> Best,
>> Leonard Xu
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#using-bundled-hive-jar
>>  
>> 



Re: flink-1.11 DDL 设置chk目录问题

2020-07-14 文章 Leonard Xu
Hi,

没有太理解在DDL中设置,TableConfig上也可以设置 StreamEexecutionEnvironment 的 配置,你要的是这个吗?

tableEnv.getConfig().getConfiguration().set(CHECKPOINTS_DIRECTORY, 
"your-cp-path");


祝好,
Leonard Xu


> 在 2020年7月14日,18:56,kcz <573693...@qq.com> 写道:
> 
> 目前我只会设置streameEnv.setStateBackend(new FsStateBackend(checkpointPath));
> 但是DDL时候应该如何设置呢?
> tableEnv.getConfig().getConfiguration().set(
>ExecutionCheckpointingOptions.CHECKPOINTING_MODE, 
> CheckpointingMode.EXACTLY_ONCE);
> tableEnv.getConfig().getConfiguration().set(
>ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
> Duration.ofSeconds(10));



Re:Re: flink1.11 sink hive error

2020-07-14 文章 Zhou Zach
Hi,


是在flink的conf文件配置hive.metastore.uris吗

















在 2020-07-14 20:03:11,"Leonard Xu"  写道:
>Hello
>
>
>> 在 2020年7月14日,19:52,Zhou Zach  写道:
>> 
>> : Embedded metastore is not allowed.
>
>Flink 集成 Hive 时,不支持 embedded metastore 的, 你需要起一个hive metastore 并在conf文件配置 
>hive.metastore.uris, 支持的 metastore 版本 参考[1]
>
>Best,
>Leonard Xu
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#using-bundled-hive-jar
> 
>


Re: flink1.11 sink hive error

2020-07-14 文章 Leonard Xu
Hello


> 在 2020年7月14日,19:52,Zhou Zach  写道:
> 
> : Embedded metastore is not allowed.

Flink 集成 Hive 时,不支持 embedded metastore 的, 你需要起一个hive metastore 并在conf文件配置 
hive.metastore.uris, 支持的 metastore 版本 参考[1]

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#using-bundled-hive-jar
 


Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 文章 Leonard Xu
Hi,
看了下代码,Stream 模式 确实有这个问题, batch 没有,原因是:

CollectStreamTableSink 实现的是 TupleTypeInfo> getOutputType()
CollectBatchTableSink 实现的是 DataType getConsumedDataType()

刚刚搜了下,社区有个 issue [1] 在彻底解这个问题,Godgrey 已经开PR了,这应该会把这两个CollectSink都去掉,使用 
TableResult#collect()来收集数据。

Best,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18550 




> 在 2020年7月14日,18:55,wldd  写道:
> 
> Hi:
> batchi模式执行结果:
> https://imgchr.com/i/UUqec6
> batch模式日志:
> https://imgchr.com/i/UUboX8
> streaming模式日志:
> https://imgchr.com/i/UUbYmF
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> 
> Best,
> wldd
> 
> 
> 
> 
> At 2020-07-14 18:43:39, "wldd"  wrote:
> 
> Hi:
> 图片的内容没展示出来,图片的内容就是个查询结果,
> 
> 
> error日志这是batch模式的debug日志:
> 2020-07-14 18:33:23,180 DEBUG 
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input 
> splitting configured (data will be read with parallelism 1).
> 2020-07-14 18:33:23,181 DEBUG org.apache.calcite.sql2rel  
>  [] - Plan after converting SqlNode to RelNode
> LogicalProject(money=[$0])
>  LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:33:23,197 DEBUG 
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input 
> splitting configured (data will be read with parallelism 1).
> 2020-07-14 18:33:23,198 DEBUG org.apache.calcite.sql2rel  
>  [] - Plan after converting SqlNode to RelNode
> LogicalProject(money=[$0])
>  LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:33:23,201 DEBUG 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
> iteration: 1
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner   
>  [] - For final plan, using 
> rel#2907:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2906,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner   
>  [] - For final plan, using 
> rel#2905:LogicalProject.NONE.any.[](input=HepRelVertex#2904,inputs=0)
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner   
>  [] - For final plan, using 
> rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
> 2020-07-14 18:33:23,202 DEBUG 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
> optimize convert table references before rewriting sub-queries to semi-join 
> cost 1 ms.
> optimize result:
> LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
> +- LogicalProject(money=[$0])
>   +- LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner   
>  [] - For final plan, using 
> rel#2912:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2911,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner   
>  [] - For final plan, using 
> rel#2910:LogicalProject.NONE.any.[](input=HepRelVertex#2909,inputs=0)
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner   
>  [] - For final plan, using 
> rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
> 2020-07-14 18:33:23,202 DEBUG 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
> optimize rewrite sub-queries to semi-join cost 0 ms.
> optimize result:
> LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
> +- LogicalProject(money=[$0])
>   +- LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 
> 
> 
> 
> 
> 
> 这是streaming模式的debug日志:
> 2020-07-14 18:35:45,995 DEBUG org.apache.calcite.sql2rel  
>  [] - Plan after converting SqlNode to RelNode
> LogicalProject(money=[$0])
>  LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:35:46,015 DEBUG org.apache.calcite.sql2rel  
>  [] - Plan after converting SqlNode to RelNode
> LogicalProject(money=[$0])
>  LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils
>  [] - Trying to connect to (mosh-data-1/192.168.0.29:6123) from 
> local address mosh-data-1/192.168.0.29 with timeout 200
> 2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils
>  [] - Using InetAddress.getLocalHost() immediately for the 
> connecting address
> 2020-07-14 18:35:46,022 DEBUG 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
> iteration: 1
> 2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner   
>  [] - For final plan, using 
> rel#3047:LogicalLegacySink.NONE.any.None: 
> 0.[NONE].[NONE](input=HepRelVertex#3046,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
> 2020-07-14 

flink1.11 sink hive error

2020-07-14 文章 Zhou Zach
hi all,
flink1.11 sql sink hive table 报错:


java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_161]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_161]
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
 [data-flow-1.0.jar:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 [data-flow-1.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[data-flow-1.0.jar:?]
Caused by: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Embedded metastore is not allowed. Make sure you have 
set a valid value for hive.metastore.uris
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
... 10 more
Caused by: java.lang.IllegalArgumentException: Embedded metastore is not 
allowed. Make sure you have set a valid value for hive.metastore.uris
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) 
~[data-flow-1.0.jar:?]
at 
org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:171) 
~[flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:157) 
~[flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar:1.11.0]
at 
cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveAndHbase$.main(FromKafkaSinkHiveAndHbase.scala:27)
 ~[data-flow-1.0.jar:?]
at 
cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveAndHbase.main(FromKafkaSinkHiveAndHbase.scala)
 ~[data-flow-1.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_161]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_161]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 

flink-1.11 DDL ????chk????????

2020-07-14 文章 kcz
??streameEnv.setStateBackend(new FsStateBackend(checkpointPath));
DDL
tableEnv.getConfig().getConfiguration().set(
ExecutionCheckpointingOptions.CHECKPOINTING_MODE, 
CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(10));

Re:Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 文章 wldd
Hi:
batchi模式执行结果:
https://imgchr.com/i/UUqec6
batch模式日志:
https://imgchr.com/i/UUboX8
streaming模式日志:
https://imgchr.com/i/UUbYmF













--

Best,
wldd




At 2020-07-14 18:43:39, "wldd"  wrote:

Hi:
图片的内容没展示出来,图片的内容就是个查询结果,


error日志这是batch模式的debug日志:
2020-07-14 18:33:23,180 DEBUG 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input 
splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,181 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,197 DEBUG 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input 
splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,198 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,201 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
iteration: 1
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2907:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2906,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2905:LogicalProject.NONE.any.[](input=HepRelVertex#2904,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize convert table references before rewriting sub-queries to semi-join 
cost 1 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2912:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2911,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2910:LogicalProject.NONE.any.[](input=HepRelVertex#2909,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize rewrite sub-queries to semi-join cost 0 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])








这是streaming模式的debug日志:
2020-07-14 18:35:45,995 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,015 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils  
   [] - Trying to connect to (mosh-data-1/192.168.0.29:6123) from local 
address mosh-data-1/192.168.0.29 with timeout 200
2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils  
   [] - Using InetAddress.getLocalHost() immediately for the connecting 
address
2020-07-14 18:35:46,022 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
iteration: 1
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3047:LogicalLegacySink.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#3046,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 
0.[NONE].[NONE](table=[mydb, test, test2])
2020-07-14 18:35:46,022 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize convert table references before rewriting sub-queries to semi-join 
cost 0 ms.
optimize result:
 

Re: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn

谢谢,根本原因就是  flink sql-client 客户端默认没有设置 checkpoint 导致的。



wangl...@geekplus.com.cn 

 
Sender: Rui Li
Send Time: 2020-07-14 18:29
Receiver: user-zh
cc: Leonard Xu; 夏帅
Subject: Re: Re: 不能实时读取实时写入到 Hive 的数据
流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL
client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint
 
On Tue, Jul 14, 2020 at 5:49 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 我把问题简化一下,创建 Hive 表时不带任何参数
>
> CREATE TABLE hive_ods_wms_pick_order (
>   order_no STRING,
>   status INT,
>   dispatch_time TIMESTAMP
> )  STORED AS parquet
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time FROM kafka_ods_wms_pick_order;
>
> 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
> 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM
> kafka_ods_wms_pick_order 确实是有数据返回的。
>
> 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
> 是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
> 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?
>
> 谢谢,
> 王磊
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 17:20
> 收件人: user-zh; 夏帅
> 抄送: wangl...@geekplus.com.cn
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
>
> Hi, wanglei
>
> 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive
> 的分区已完成信息(通过metastore或success文件).
>
> 你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置
>
> 祝好,
> Leonard Xu
>
>
> 在 2020年7月14日,16:59,夏帅  写道:
>
> 你好,
> 可以参考下这个问题的解决
>
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
>
>
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:50
> 收件人:user-zh ; 夏帅 ;
> Leonard Xu 
> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
>
>
> 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
>
>
> CREATE TABLE kafka_ods_wms_pick_order (
> order_no STRING,
> status INT,
> dispatch_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ods_wms_pick_order',
> 'properties.bootstrap.servers' = ':9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
>
>
> CREATE TABLE hive_ods_wms_pick_order (
>  order_no STRING,
>  status INT,
>  dispatch_time TIMESTAMP
> ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>  'sink.partition-commit.trigger'='partition-time',
>  'sink.partition-commit.delay'='1 h',
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'),
> DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> SELECT * FROM hive_ods_wms_pick_order /*+
> OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-07-24') */;
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: 夏帅
> Send Time: 2020-07-14 16:43
> Receiver: user-zh; xbjtdcq
> Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> 你好,
> 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:40
> 收件人:user-zh ; xbjtdcq 
> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> 我加上了这个 tablehint 。
> 任务提交上去了,但客户端还是没有任何返回显示。
> 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
> 谢谢,
> 王磊
> wangl...@geekplus.com.cn
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 16:17
> 收件人: user-zh
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
> HI, wanglei
> 你开启了 streaming-source.enable
> 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints
> 方便地指定参数。
> SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-05-20') */;
> 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> 祝好,
> Leonard Xu
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink
> webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
>
 
-- 
Best regards!
Rui Li


Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 文章 wldd
Hi:
图片的内容没展示出来,图片的内容就是个查询结果,


error日志这是batch模式的debug日志:
2020-07-14 18:33:23,180 DEBUG 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input 
splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,181 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,197 DEBUG 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input 
splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,198 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,201 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
iteration: 1
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2907:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2906,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2905:LogicalProject.NONE.any.[](input=HepRelVertex#2904,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize convert table references before rewriting sub-queries to semi-join 
cost 1 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2912:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2911,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2910:LogicalProject.NONE.any.[](input=HepRelVertex#2909,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using 
rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize rewrite sub-queries to semi-join cost 0 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])








这是streaming模式的debug日志:
2020-07-14 18:35:45,995 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,015 DEBUG org.apache.calcite.sql2rel
   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils  
   [] - Trying to connect to (mosh-data-1/192.168.0.29:6123) from local 
address mosh-data-1/192.168.0.29 with timeout 200
2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils  
   [] - Using InetAddress.getLocalHost() immediately for the connecting 
address
2020-07-14 18:35:46,022 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
iteration: 1
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3047:LogicalLegacySink.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#3046,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner 
   [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 
0.[NONE].[NONE](table=[mydb, test, test2])
2020-07-14 18:35:46,022 DEBUG 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - 
optimize convert table references before rewriting sub-queries to semi-join 
cost 0 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[CAST($0):DECIMAL(38, 18)])
   +- LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,023 DEBUG 

Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 文章 Leonard Xu
Hi,

前面邮件图都挂了,理论上 SQL Client 都是会强转的,可以发个图床链接上或者贴下可以复现的代码吗?

祝好

> 在 2020年7月14日,18:21,wldd  写道:
> 
> Hi,
> batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转
> 
> 
> 
> 
> 
> --
> Best,
> wldd
> 
> 
> 在 2020-07-14 18:08:41,"Leonard Xu"  写道:
> >Hi,
> >
> >SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 
> >DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
> >你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。
> >
> >祝好,
> >Leonard Xu
> >[1] https://issues.apache.org/jira/browse/FLINK-17948 
> >
> >
> >> 在 2020年7月14日,17:58,wldd  写道:
> >> 
> >> sql-client
> >
> 
> 
>  



Re: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 Rui Li
流数据写hive时,不管是分区表还是非分区表,commit都是通过checkpoint触发的。用SQL
client的话可以在flink-conf.yaml里设置execution.checkpointing.interval来开启checkpoint

On Tue, Jul 14, 2020 at 5:49 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> 我把问题简化一下,创建 Hive 表时不带任何参数
>
> CREATE TABLE hive_ods_wms_pick_order (
>   order_no STRING,
>   status INT,
>   dispatch_time TIMESTAMP
> )  STORED AS parquet
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time FROM kafka_ods_wms_pick_order;
>
> 我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
> 我在 flink 客户端 SELECT order_no, status, dispatch_time FROM
> kafka_ods_wms_pick_order 确实是有数据返回的。
>
> 我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
> 是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
> 我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?
>
> 谢谢,
> 王磊
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 17:20
> 收件人: user-zh; 夏帅
> 抄送: wangl...@geekplus.com.cn
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
>
> Hi, wanglei
>
> 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive
> 的分区已完成信息(通过metastore或success文件).
>
> 你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置
>
> 祝好,
> Leonard Xu
>
>
> 在 2020年7月14日,16:59,夏帅  写道:
>
> 你好,
> 可以参考下这个问题的解决
>
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
>
>
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:50
> 收件人:user-zh ; 夏帅 ;
> Leonard Xu 
> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
>
>
> 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
>
>
> CREATE TABLE kafka_ods_wms_pick_order (
> order_no STRING,
> status INT,
> dispatch_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ods_wms_pick_order',
> 'properties.bootstrap.servers' = ':9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
>
>
> CREATE TABLE hive_ods_wms_pick_order (
>  order_no STRING,
>  status INT,
>  dispatch_time TIMESTAMP
> ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>  'sink.partition-commit.trigger'='partition-time',
>  'sink.partition-commit.delay'='1 h',
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'),
> DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> SELECT * FROM hive_ods_wms_pick_order /*+
> OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-07-24') */;
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: 夏帅
> Send Time: 2020-07-14 16:43
> Receiver: user-zh; xbjtdcq
> Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> 你好,
> 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:40
> 收件人:user-zh ; xbjtdcq 
> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> 我加上了这个 tablehint 。
> 任务提交上去了,但客户端还是没有任何返回显示。
> 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
> 谢谢,
> 王磊
> wangl...@geekplus.com.cn
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 16:17
> 收件人: user-zh
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
> HI, wanglei
> 你开启了 streaming-source.enable
> 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints
> 方便地指定参数。
> SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-05-20') */;
> 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> 祝好,
> Leonard Xu
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
>
>
> 试验了一下 Flink-1.11  hive streaming 的功能
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>
> 创建 kafka 表,通过 SQL 实时写入 Hive.
>
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink
> webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
>

-- 
Best regards!
Rui Li


Re: flink 1.11 upsert结果出错

2020-07-14 文章 Leonard Xu
是的,目前是更新相同pk的记录,如果需要统计相同pk的记录, Flink表不声明PK就是append 
写入,就会有写入多条记录,(DB里的表也不声明pk,不然insert会报错)。

祝好


> 在 2020年7月14日,18:21,小学生 <201782...@qq.com> 写道:
> 
> 嗯嗯,谢谢大佬的解答,还有一个问题就是sql自己的语法是支持增量式的比如score=score+1,现在flink1.11特性反应成数据库 upsert 
> SQL的方式,其实是全量的更新同Pk的记录吧,并达不到增量的情况吧。



Re: flink cep 如何处理超时事件?

2020-07-14 文章 shizk233
Hi drewfranklin,

flink使用event time,然后类似下面这样可以吗?
Pattern.begin("a").next("b").within(Time.minutes(1));

Best,
shizk233

drewfranklin  于2020年7月14日周二 上午11:05写道:

> Hello all.
>  想请教下各位。
>
> 我有个用户开户超时断点的场景。调研了一下,想通过flink cep 来实现。
>
> 但是我定义pattern 后发现,我的这个没办法在一条事件数据上完成判定。必须借助和上一事件数据比较之后判断是不是超时。
>
>
> 想知道该如何定义pattern 能够,取到排序之后前后两个两个事件。


Re: flink 1.11 upsert????????

2020-07-14 文章 ??????
??sqlscore=score+1,flink1.11
 upsert SQL??Pk

Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 文章 wldd
Hi,
batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转










--

Best,
wldd





在 2020-07-14 18:08:41,"Leonard Xu"  写道:
>Hi,
>
>SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 
>DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
>你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。
>
>祝好,
>Leonard Xu
>[1] https://issues.apache.org/jira/browse/FLINK-17948 
>
>
>> 在 2020年7月14日,17:58,wldd  写道:
>> 
>> sql-client
>


Re: flink 1.11 upsert结果出错

2020-07-14 文章 Leonard Xu
Hello,

> 在 2020年7月14日,17:56,小学生 <201782...@qq.com> 写道:
> 
> ON DUPLICATE KEY UPDATE 

这个语法 Flink 还不支持的,官网上说的 Flink 的 JDBC connector 实现 
幂等写入[1]的方式,就是有相同pk的数据在写入数据库时,翻译成数据库 upsert SQL的方式,这里说的语法是数据库的 SQL 语法 。


Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes
 




Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 文章 Leonard Xu
Hi,

SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 
DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。

祝好,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-17948 


> 在 2020年7月14日,17:58,wldd  写道:
> 
> sql-client



Re: flink1.9.1-消费kafka落pg库任务出错

2020-07-14 文章 shizk233
Hi nicygan,

unable to create new native thread指的是无法创建checkpoint线程,并不是内存占用过大。
这种情况一般有3种可能的原因:
1.flink应用开启太多线程
2.机器上句柄设置太小
3.机器上的其他应用开启太多线程

建议排查一下机器上的ulimit设置(文件句柄会影响应用能开启的线程数),和flink metrics里监控到的线程数变化。

Best,
shizk233



nicygan  于2020年7月14日周二 上午10:31写道:

> dear all:
>
> 我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。
>   详细如下:
>
>
> 1、部署方式:
> flink on yarn ,pre-job,每个container 1024 M
> jobmanager的jvmoption(默认的)  -Xms424m-Xmx424m
>
>
> 2、数据情况:
> kafka数据,约1分钟1条,文本数据,每条数据都非常小。
>
>
> 3、任务情况:
> 很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。
> 消费采用 FlinkKafkaConsumer
> 写库采用 JDBCAppendTableSink
> 并行度 1
> checkpoint 2分钟一次,每次checkpoint约100ms
> statebackend rocksdb
>
>
> 4、报错情况:
> 2020-07-10 11:51:54,237 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483.
> 2020-07-10 11:51:54,421 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77
> ms).
> 2020-07-10 11:53:54,253 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483.
> 2020-07-10 11:53:54,457 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124
> ms).
> 2020-07-10 11:55:54,246 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483.
> 2020-07-10 11:55:54,402 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115
> ms).
> 2020-07-10 11:56:34,155 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
> Thread 'flink-akka.actor.default-dispatcher-4673' produced an uncaught
> exception. Stopping the process...
> java.lang.OutOfMemoryError: unable to create new native thread
> at java.lang.Thread.start0(Native Method)
> at java.lang.Thread.start(Thread.java:717)
> at
> akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
> at
> akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
> at
> akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905)
> at
> akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
> at
> akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30)
> at
> akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211)
> at
> akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211)
> at
> akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39)
> at
> akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115)
> at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55)
> at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142)
> at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136)
> at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
> at akka.actor.Cell.sendMessage(ActorCell.scala:350)
> at akka.actor.Cell.sendMessage$(ActorCell.scala:349)
> at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
> at
> akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173)
> at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> thanks all / by nicygan
>


flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

2020-07-14 文章 wldd
hi,all:
现在遇到一个问题,通过sql-client读取mysql数据时,decimal类型会强转成decimal(38,18)
mysql ddl:
CREATE TABLE `test2` (
  `money` decimal(10,2) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


insert into test2 values(10.22);


flink ddl:
CREATE TABLE test2 (
money decimal(10, 2)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/test',
'connector.table' = 'test2',
'connector.username' = 'root',
'connector.password' = 'root'
);


flink查询结果,streaming模式:
sql:select * from test2;


debug信息:










--

Best,
wldd

Re: flink 1.11??????????mysql????

2020-07-14 文章 ??????
??

flink 1.11 upsert????????

2020-07-14 文章 ??????
flink??Kafka??mysqlmysqlon
 
DUPLICATE??linuxpython
 *.py


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab ( 
trck_id VARCHAR,
score INT


) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g', 
'scan.startup.mode' = 'earliest-offset', 
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json' 
)
"""

sink="""
CREATE TABLE g_source_tab (
trck_id VARCHAR,
score INT,

PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
'table-name' = 'g', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


table_result1=t_env.execute_sql('''Insert into g_source_tab (`trck_id`,`score`) 
VALUES (select
   
trck_id,score from kafka_source_tab ) ON DUPLICATE KEY UPDATE 
score=score+1''')

table_result1.get_job_client().get_job_execution_result().result()

Re: 退订

2020-07-14 文章 Leonard Xu
Hi

打错了,发送任意邮件到 dev-subscr...@flink.apache.org 
 即可订阅来自dev 
@flink.apache.org 
的邮件

祝好,
Leonard Xu

> Hi,
> 类似的,发送任意邮件到 dev-subscr...@flink.apache.org 
>  即可订阅来自dev 
> @flink.apache.org 
> 的邮件,
> 上面的链接【1】里也有订阅方式的。
> 
> 祝好



> 在 2020年7月14日,17:47,Leonard Xu  写道:
> 
> Hi,
> 类似的,发送任意邮件到 dev-digest-subscr...@flink.apache.org 
>  即可订阅来自dev 
> @flink.apache.org 
> 的邮件,
> 上面的链接【1】里也有订阅方式的。
> 
> 祝好
>> 在 2020年7月14日,17:33,zilong xiao > > 写道:
>> 
>> 你好 我想问下,想加入flink dev邮件组应该怎么操作呢?
>> 
>> Leonard Xu mailto:xbjt...@gmail.com>> 于2020年7月14日周二 
>> 下午5:00写道:
>> 
>>> 你好,
>>> 
>>> 退订来自user-zh 邮件组的邮件, 请发任意消息到 这个邮箱:user-zh-unsubscr...@flink.apache.org 
>>> 
>>> >> > 即可退订
>>> 
>>> 退订其他邮件可以参考[1]
>>> 
>>> 祝好
>>> 
>>> [1] https://flink.apache.org/community.html#mailing-lists 
>>>  <
>>> https://flink.apache.org/community.html#mailing-lists 
>>> >
>>> 
 在 2020年7月14日,16:55,李国鹏 >>> > 写道:
 
 退订
>>> 
>>> 
> 



回复: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn
我把问题简化一下,创建 Hive 表时不带任何参数

CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
)  STORED AS parquet

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time FROM kafka_ods_wms_pick_order;

我用的 sql-client 客户端,15 分钟过去了 hive 表对应的 hdfs 目录为什么还只是有一个大小为 0 的 .part 文件呢?
我在 flink 客户端 SELECT order_no, status, dispatch_time FROM 
kafka_ods_wms_pick_order 确实是有数据返回的。

我在 flink web ui 看了下这个 job 的 Checkpoint Counts 是 0.
是需要让 job 做 checkpoint 才能写到 hdfs 上吗?
我用 Flink sql-client 客户端怎么设置做 checkpoint 的频率呢?

谢谢,
王磊




wangl...@geekplus.com.cn 

 
发件人: Leonard Xu
发送时间: 2020-07-14 17:20
收件人: user-zh; 夏帅
抄送: wangl...@geekplus.com.cn
主题: Re: 不能实时读取实时写入到 Hive 的数据

Hi, wanglei

这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 
的分区已完成信息(通过metastore或success文件).

你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置

祝好,
Leonard Xu


在 2020年7月14日,16:59,夏帅  写道:

你好,
可以参考下这个问题的解决
http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html


--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:50
收件人:user-zh ; 夏帅 ; Leonard Xu 

主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据


应该是我没有理解 partitiion-commit 
的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
order_no STRING,
status INT,
dispatch_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'ods_wms_pick_order',
'properties.bootstrap.servers' = ':9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), 
DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-07-24') */;




wangl...@geekplus.com.cn


Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 
谢谢,
王磊
wangl...@geekplus.com.cn 
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
祝好,
Leonard Xu
在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:


试验了一下 Flink-1.11  hive streaming 的功能
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html

创建 kafka 表,通过 SQL 实时写入 Hive.

但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
页面观察 这个 select * from hive_table 的 job 已经结束了。

谢谢,
王磊



wangl...@geekplus.com.cn 




Re: Re: 退订

2020-07-14 文章 zhaoheng.zhaoh...@qq.com
你好,   

  可以发送订阅到d...@flink.apache.org

参见 https://flink.apache.org/community.html



zhaoheng.zhaoh...@qq.com
 
发件人: zilong xiao
发送时间: 2020-07-14 17:33
收件人: user-zh
主题: Re: 退订
你好 我想问下,想加入flink dev邮件组应该怎么操作呢?
 
Leonard Xu  于2020年7月14日周二 下午5:00写道:
 
> 你好,
>
> 退订来自user-zh 邮件组的邮件, 请发任意消息到 这个邮箱:user-zh-unsubscr...@flink.apache.org
>  即可退订
>
> 退订其他邮件可以参考[1]
>
> 祝好
>
> [1] https://flink.apache.org/community.html#mailing-lists <
> https://flink.apache.org/community.html#mailing-lists>
>
> > 在 2020年7月14日,16:55,李国鹏  写道:
> >
> > 退订
>
>


Re: 退订

2020-07-14 文章 Leonard Xu
Hi,
类似的,发送任意邮件到 dev-digest-subscr...@flink.apache.org 
 即可订阅来自dev 
@flink.apache.org 
的邮件,
上面的链接【1】里也有订阅方式的。

祝好
> 在 2020年7月14日,17:33,zilong xiao  写道:
> 
> 你好 我想问下,想加入flink dev邮件组应该怎么操作呢?
> 
> Leonard Xu  于2020年7月14日周二 下午5:00写道:
> 
>> 你好,
>> 
>> 退订来自user-zh 邮件组的邮件, 请发任意消息到 这个邮箱:user-zh-unsubscr...@flink.apache.org
>>  即可退订
>> 
>> 退订其他邮件可以参考[1]
>> 
>> 祝好
>> 
>> [1] https://flink.apache.org/community.html#mailing-lists <
>> https://flink.apache.org/community.html#mailing-lists>
>> 
>>> 在 2020年7月14日,16:55,李国鹏  写道:
>>> 
>>> 退订
>> 
>> 



Re: Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-14 文章 Jun Zhang
hi,Zhou Zach :
问一下,你把你的程序,并行度设置成 1,还能正常读取hive的数据吗?

Zhou Zach  于2020年7月13日周一 下午8:17写道:

> 好的,感谢答疑
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 19:49:10,"Jingsong Li"  写道:
> >创建kafka_table需要在default dialect下。
> >
> >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach  wrote:
> >
> >> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
> >> 如果是default Dialect创建的表,是不是只是在临时会话有效
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-13 19:27:44,"Jingsong Li"  写道:
> >> >Hi,
> >> >
> >> >问题一:
> >> >
> >> >只要current catalog是HiveCatalog。
> >> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
> >> >
> >> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
> >> >
> >> >问题二:
> >> >
> >> >用filesystem创建出来的是filesystem的表,它和hive
> >> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
> >> >
> >> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
> >> >但是它的partition commit是不支持metastore的,所以不会有自动add
> >> >partition到hive的默认实现,你需要自定义partition-commit-policy.
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:
> >> >
> >> >> 尴尬
> >> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> >> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> >> >> 还有两个问题问下,
> >> >> 问题1:
> >> >> 创建的kafka_table,在hive和Flink
> >> >>
> >>
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 问题2:
> >> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector
> 也是可以创建hive表,我尝试了一下,报错了:
> >> >> java.util.concurrent.CompletionException:
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> >> Could not execute application.
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >> at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >> at
> >> >>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> >> [?:1.8.0_161]
> >> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> >> [?:1.8.0_161]
> >> >> at
> >> >>
> >>
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >>
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >>
> >>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >>
> >>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> Caused by:
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> >> Could not execute application.
> >> >> ... 11 more
> >> >> Caused by:
> org.apache.flink.client.program.ProgramInvocationException:
> >> The
> >> >> main method caused an error: Unable to create a sink for writing
> table
> >> >> 

flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

2020-07-14 文章 jindy_liu
代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化?

麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗?


at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103)
at
com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)



代码=>

public class PostgresSinkMapFunction extends RichFlatMapFunction {
private static String driverClass = "org.postgresql.Driver";
private static String dbUrl =
"jdbc:postgresql://localhost:5432/ai_audio_lyric_task";
private static String userNmae = "postgres";
private static String passWord = "1";

// 表status
private static JdbcRowDataOutputFormat outputFormatStatus;
private static String[] fieldNames = new String[] {"id", "name"};
private static DataType[] fieldDataTypes = new DataType[]{
DataTypes.INT(),
DataTypes.STRING()};

private static RowType rowType = RowType.of(
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new),
fieldNames);
private static RowDataTypeInfo rowDataTypeInfo =
RowDataTypeInfo.of(rowType);

@Override
public void flatMap(String s, Collector collector) throws
Exception {
GenericRowData row = new GenericRowData(2);

 row.setRowKind(INSERT);
 row.setField(0, count);
 row.setField(1, "jindy" + Integer.toString(count));

outputFormatStatus.writeRecord(row);

}

public void open(Configuration parameters) throws Exception {
super.open(parameters);

JdbcOptions jdbcOptions = JdbcOptions.builder()
.setDriverName(driverClass)
.setDBUrl(dbUrl)
.setTableName("status_mirror")
.setUsername(userNmae)
.setPassword(passWord)
.build();

JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
.withTableName(jdbcOptions.getTableName())
.withDialect(jdbcOptions.getDialect())
.withFieldNames(fieldNames)
.build();

outputFormatStatus =
JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
.setJdbcOptions(jdbcOptions)
.setFieldDataTypes(fieldDataTypes)
.setJdbcDmlOptions(dmlOptions)
   
.setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
.build();

// set context,这里有问题!!
outputFormatStatus.setRuntimeContext(this.getRuntimeContext());
outputFormatStatus.open(0, 1);
}

public void close() throws Exception {
super.close();
outputFormatStatus.close();
}
}





--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? flink on yarn????????

2020-07-14 文章 jianxu


Yarn??
1. 
rm-address/ws/v1/cluster/apps/applicationId??amContainerLogurljm??url.
2. 
taskmanagerurlrm-address/proxy/applicationId/taskmanagers,taskmanageramContainerLogcontainername??ip??


url?start=0=1024
| |
jianxu
|
|
rjia...@163.com
|
??2020??07??14?? 17:07??Cayden chen<1193216...@qq.com> ??





----
??: 
   "user-zh"

https://logging.apache.org/log4j/2.x/manual/appenders.html

Best,
Yangze Guo

On Tue, Jul 14, 2020 at 4:46 PM nicygan https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
   
Best,
Yangze Guo
   
   
On Mon, Jul 13, 2020 at 2:40 PM  
<13162790...@163.com wrote:

  
 1 
 2  per-job
 3 cdh flink.1.10














 ?? 2020-07-13 11:18:46??"Yangze Guo" 
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
 [2]
   
  
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
 [3]
   
  
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
 
 
 Best,
 Yangze Guo
 
 On Mon, Jul 13, 2020 at 10:49 AM  
<13162790...@163.com wrote:
 
  
  1 flink on yarn?? taskmanager 
??  
??es 
??taskmanager 
  2 flink on yarn 
??taskmanager ??jobmanager 
?? ?? task 
 taskmanager??jobmanager??
 
   
 

Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 Rui Li
还可以在hive那边验证一下数据是否commit了,比如从hive CLI端执行一下show partitions,或者读一点数据

On Tue, Jul 14, 2020 at 5:20 PM Leonard Xu  wrote:

>
> Hi, wanglei
>
> 这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive
> 的分区已完成信息(通过metastore或success文件).
>
> 你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置
>
> 祝好,
> Leonard Xu
>
>
> > 在 2020年7月14日,16:59,夏帅  写道:
> >
> > 你好,
> > 可以参考下这个问题的解决
> >
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
> >
> >
> > --
> > 发件人:wangl...@geekplus.com.cn 
> > 发送时间:2020年7月14日(星期二) 16:50
> > 收件人:user-zh ; 夏帅 ;
> Leonard Xu 
> > 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
> >
> >
> > 应该是我没有理解 partitiion-commit 的意思,我看这里有文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
> >
> >
> > CREATE TABLE kafka_ods_wms_pick_order (
> > order_no STRING,
> > status INT,
> > dispatch_time TIMESTAMP(3)
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' = 'ods_wms_pick_order',
> > 'properties.bootstrap.servers' = ':9092',
> > 'properties.group.id' = 'testGroup',
> > 'format' = 'json',
> > 'scan.startup.mode' = 'latest-offset'
> > )
> >
> >
> > CREATE TABLE hive_ods_wms_pick_order (
> >  order_no STRING,
> >  status INT,
> >  dispatch_time TIMESTAMP
> > ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
> >  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >  'sink.partition-commit.trigger'='partition-time',
> >  'sink.partition-commit.delay'='1 h',
> >  'sink.partition-commit.policy.kind'='metastore,success-file'
> > );
> >
> > INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status,
> dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'),
> DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> > SELECT * FROM hive_ods_wms_pick_order /*+
> OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-07-24') */;
> >
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
> > Sender: 夏帅
> > Send Time: 2020-07-14 16:43
> > Receiver: user-zh; xbjtdcq
> > Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> > 你好,
> > 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> > --
> > 发件人:wangl...@geekplus.com.cn 
> > 发送时间:2020年7月14日(星期二) 16:40
> > 收件人:user-zh ; xbjtdcq 
> > 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> > 我加上了这个 tablehint 。
> > 任务提交上去了,但客户端还是没有任何返回显示。
> > 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。
> > 谢谢,
> > 王磊
> > wangl...@geekplus.com.cn
> > 发件人: Leonard Xu
> > 发送时间: 2020-07-14 16:17
> > 收件人: user-zh
> > 主题: Re: 不能实时读取实时写入到 Hive 的数据
> > HI, wanglei
> > 你开启了 streaming-source.enable
> 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints
> 方便地指定参数。
> > SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-05-20') */;
> > 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> > 祝好,
> > Leonard Xu
> >> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> >>
> >>
> >> 试验了一下 Flink-1.11  hive streaming 的功能
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> >>
> >> 创建 kafka 表,通过 SQL 实时写入 Hive.
> >>
> >> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink
> webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
> >>
> >> 谢谢,
> >> 王磊
> >>
> >>
> >>
> >> wangl...@geekplus.com.cn
> >>
>
>

-- 
Best regards!
Rui Li


Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 Leonard Xu

Hi, wanglei

这个参数 'sink.partition-commit.delay'='1 h’会在cp 完成后 + 你设置的1h delay后才会提交 hive 
的分区已完成信息(通过metastore或success文件).

你看下夏帅贴的邮件,检查下 checkpoint 和  partition-commit的设置

祝好,
Leonard Xu


> 在 2020年7月14日,16:59,夏帅  写道:
> 
> 你好,
> 可以参考下这个问题的解决
> http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html
> 
> 
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:50
> 收件人:user-zh ; 夏帅 ; Leonard 
> Xu 
> 主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据
> 
> 
> 应该是我没有理解 partitiion-commit 
> 的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
> 
> 
> CREATE TABLE kafka_ods_wms_pick_order (
> order_no STRING,
> status INT,
> dispatch_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ods_wms_pick_order',
> 'properties.bootstrap.servers' = ':9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
> 
> 
> CREATE TABLE hive_ods_wms_pick_order (
>  order_no STRING,
>  status INT,
>  dispatch_time TIMESTAMP
> ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>  'sink.partition-commit.trigger'='partition-time',
>  'sink.partition-commit.delay'='1 h',
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> );
> 
> INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
> dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), 
> DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
> SELECT * FROM hive_ods_wms_pick_order /*+ 
> OPTIONS('streaming-source.enable'='true', 
> 'streaming-source.consume-start-offset'='2020-07-24') */;
> 
> 
> 
> 
> wangl...@geekplus.com.cn
> 
> 
> Sender: 夏帅
> Send Time: 2020-07-14 16:43
> Receiver: user-zh; xbjtdcq
> Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
> 你好,
> 这说明写入的hive文件没有进行rollup,可以贴下SQL么
> --
> 发件人:wangl...@geekplus.com.cn 
> 发送时间:2020年7月14日(星期二) 16:40
> 收件人:user-zh ; xbjtdcq 
> 主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
> 我加上了这个 tablehint 。
> 任务提交上去了,但客户端还是没有任何返回显示。 
> 我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 
> 谢谢,
> 王磊
> wangl...@geekplus.com.cn 
> 发件人: Leonard Xu
> 发送时间: 2020-07-14 16:17
> 收件人: user-zh
> 主题: Re: 不能实时读取实时写入到 Hive 的数据
> HI, wanglei
> 你开启了 streaming-source.enable 
> 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 
> 方便地指定参数。
> SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
> 'streaming-source.consume-start-offset'='2020-05-20') */;
> 就在你看得这个页面应该有对应的文档说明如何读取hive数据。
> 祝好,
> Leonard Xu
>> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
>> 
>> 
>> 试验了一下 Flink-1.11  hive streaming 的功能
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
>> 
>> 创建 kafka 表,通过 SQL 实时写入 Hive.
>> 
>> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
>> 页面观察 这个 select * from hive_table 的 job 已经结束了。
>> 
>> 谢谢,
>> 王磊
>> 
>> 
>> 
>> wangl...@geekplus.com.cn 
>> 



?????? Re: Re: flink on yarn????????

2020-07-14 文章 Cayden chen





----
??: 
   "user-zh"

https://logging.apache.org/log4j/2.x/manual/appenders.html

Best,
Yangze Guo

On Tue, Jul 14, 2020 at 4:46 PM nicygan https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
   
Best,
Yangze Guo
   
   
On Mon, Jul 13, 2020 at 2:40 PM  
<13162790...@163.com wrote:

  
 1 
 2  per-job
 3 cdh flink.1.10














 ?? 2020-07-13 11:18:46??"Yangze Guo" 
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
 [2]
   
  
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
 [3]
   
  
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
 
 
 Best,
 Yangze Guo
 
 On Mon, Jul 13, 2020 at 10:49 AM  
<13162790...@163.com wrote:
 
  
  1 flink on yarn?? taskmanager 
??  
??es 
??taskmanager 
  2 flink on yarn 
??taskmanager ??jobmanager 
?? ?? task 
 taskmanager??jobmanager??
 
   
 

Re: 退订

2020-07-14 文章 Leonard Xu
你好,

退订来自user-zh 邮件组的邮件, 请发任意消息到 这个邮箱:user-zh-unsubscr...@flink.apache.org 
 即可退订

退订其他邮件可以参考[1]

祝好

[1] https://flink.apache.org/community.html#mailing-lists 


> 在 2020年7月14日,16:55,李国鹏  写道:
> 
> 退订



回复:回复: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 夏帅
你好,
可以参考下这个问题的解决
http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html


--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:50
收件人:user-zh ; 夏帅 ; Leonard Xu 

主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据


应该是我没有理解 partitiion-commit 
的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'ods_wms_pick_order',
 'properties.bootstrap.servers' = ':9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), 
DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-07-24') */;




wangl...@geekplus.com.cn


Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 
谢谢,
王磊
wangl...@geekplus.com.cn 
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
祝好,
Leonard Xu
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 


退订

2020-07-14 文章 李国鹏
退订

Re: 回复: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn
应该是我没有理解 partitiion-commit 
的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'ods_wms_pick_order',
 'properties.bootstrap.servers' = ':9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), 
DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-07-24') */;




wangl...@geekplus.com.cn 

 
Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
 
 
--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
 
 
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 
 
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
 
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
 
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
 
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
 
祝好,
Leonard Xu
 
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 
 


Re:Re: Re: flink on yarn日志问题

2020-07-14 文章 nicygan
是有这个毛病,看TM日志不方便。

而且本地日志过几小时就会被清理,时间一久就看不到了,只剩JM日志。






在 2020-07-14 12:35:06,"zhisheng"  写道:
>知道 YARN 的 applicationId,应该也可以去 HDFS 找对应的 taskmanager 的日志(可以拼出路径),然后复制到本地去查看
>
>Yangze Guo  于2020年7月14日周二 上午11:58写道:
>
>> Hi, 王松
>>
>> 我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。
>>
>> Best,
>> Yangze Guo
>>
>> On Tue, Jul 14, 2020 at 8:26 AM 王松  wrote:
>> >
>> > 我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。
>> >
>> > Yangze Guo  于2020年7月13日周一 下午5:03写道:
>> >
>> > > 1.
>> > >
>> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
>> > > 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
>> > >
>> > > [1]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
>> > >
>> > > Best,
>> > > Yangze Guo
>> > >
>> > >
>> > > On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
>> > > >
>> > > > 不好意思  怪我灭有描述清楚
>> > > > 1 目前开启日志收集功能
>> > > > 2 目前已是 per-job模式
>> > > > 3 集群使用cdh flink.1.10
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
>> > > > >Hi,
>> > > > >
>> > > > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
>> > > > >
>> > > > >第二个问题,您可以尝试一下per-job mode [2][3]
>> > > > >
>> > > > >[1]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
>> > > > >[2]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
>> > > > >[3]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
>> > > > >
>> > > > >
>> > > > >Best,
>> > > > >Yangze Guo
>> > > > >
>> > > > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
>> > > > >>
>> > > > >> 请问一下两个问题
>> > > > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
>> > > ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
>> > > > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
>> > > 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
>> > > > >>
>> > >
>>


Re:Re: Re: flink on yarn日志问题

2020-07-14 文章 程龙
运行的日志会越来越多 导致查看日志比较慢 大多采用elk这种方式  除了这个有没有比较好的方案推荐一下














在 2020-07-14 12:35:06,"zhisheng"  写道:
>知道 YARN 的 applicationId,应该也可以去 HDFS 找对应的 taskmanager 的日志(可以拼出路径),然后复制到本地去查看
>
>Yangze Guo  于2020年7月14日周二 上午11:58写道:
>
>> Hi, 王松
>>
>> 我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。
>>
>> Best,
>> Yangze Guo
>>
>> On Tue, Jul 14, 2020 at 8:26 AM 王松  wrote:
>> >
>> > 我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。
>> >
>> > Yangze Guo  于2020年7月13日周一 下午5:03写道:
>> >
>> > > 1.
>> > >
>> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
>> > > 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
>> > >
>> > > [1]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
>> > >
>> > > Best,
>> > > Yangze Guo
>> > >
>> > >
>> > > On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
>> > > >
>> > > > 不好意思  怪我灭有描述清楚
>> > > > 1 目前开启日志收集功能
>> > > > 2 目前已是 per-job模式
>> > > > 3 集群使用cdh flink.1.10
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
>> > > > >Hi,
>> > > > >
>> > > > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
>> > > > >
>> > > > >第二个问题,您可以尝试一下per-job mode [2][3]
>> > > > >
>> > > > >[1]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
>> > > > >[2]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
>> > > > >[3]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
>> > > > >
>> > > > >
>> > > > >Best,
>> > > > >Yangze Guo
>> > > > >
>> > > > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
>> > > > >>
>> > > > >> 请问一下两个问题
>> > > > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
>> > > ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
>> > > > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
>> > > 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
>> > > > >>
>> > >
>>


回复:Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 夏帅
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么


--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据


我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 

谢谢,
王磊



wangl...@geekplus.com.cn 

发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei

你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;

就在你看得这个页面应该有对应的文档说明如何读取hive数据。

祝好,
Leonard Xu

> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 



回复: Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn

我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 

谢谢,
王磊



wangl...@geekplus.com.cn 

发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
 
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
 
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
 
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
 
祝好,
Leonard Xu
 
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 
 


Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 Leonard Xu
HI, wanglei

你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;

就在你看得这个页面应该有对应的文档说明如何读取hive数据。

祝好,
Leonard Xu

> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 



答复: flink state

2020-07-14 文章 zhao liang
我这边有个类似的实现,需要根据维表数据改变stream的处理,自定义了一个source(从MySQL中定时刷维表数据),kafka的stream 
union这个维表数据流,
额外增加一个数据类型(维表类型或者事实数据)进行数据的处理,后续算子将这个维表进行不同的处理并存到对应算子的state中。

发件人: Congxian Qiu 
日期: 星期二, 2020年7月14日 14:03
收件人: user-zh 
主题: Re: flink state
Hi Robert

Boardcast state[1] 是否满足你的需求呢?另外也可以看下这篇文章[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年7月13日周一 下午9:50写道:

> Hello,all
> 目前stream中遇到一个问题,
> 想使用一个全局的state 在所有的keyed stream中使用,或者global
> parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
> operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽
>
>
> Best regards


不能实时读取实时写入到 Hive 的数据

2020-07-14 文章 wangl...@geekplus.com.cn

试验了一下 Flink-1.11  hive streaming 的功能
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html

创建 kafka 表,通过 SQL 实时写入 Hive.

但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
页面观察 这个 select * from hive_table 的 job 已经结束了。

谢谢,
王磊



wangl...@geekplus.com.cn 



Re: [flink-sql] 如何在sql运行时动态修改kafka的scan.startup.mode

2020-07-14 文章 Benchao Li
可以尝试下1.11中引入的LIKE[1]或者是Table Hint[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html

Harold.Miao  于2020年7月14日周二 下午3:32写道:

> hi  all
>
> 现在有个需求,就是一段用sql-client提交的sql任务需要动态修改kafka的scan.startup.mode,以支持不同的消费需求。请问有什么好的办法吗?
> 谢谢
>
>
> --
>
> Best Regards,
> Harold Miao
>


-- 

Best,
Benchao Li


回复:滑动窗口数据存储多份问题

2020-07-14 文章 Jimmy Zhang
Hi!
好的,非常感谢!
很期待你接下来的回复。


|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

在2020年07月13日 19:58,Congxian Qiu 写道:
Hi

从 HeapListState#add 这里看是的,我跟了一个 WindowOperator 到最终 HeapListState
的逻辑,这里确实是只有一份数据,没有拷贝。这个东西的实现可能是因为性能好,我尝试确认下这个原因,多谢你的提问。

Best,
Congxian


Jimmy Zhang <13669299...@163.com> 于2020年7月12日周日 上午8:13写道:

> Hi,all!
>
> 从WindowOperator.java的processElement方法跟进去,使用windowState.add(element.getValue());添加数据,这里面找到add方法的HeapListState类的实现,
>
>
> @Override
>  public void add(V value) {
>   Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
>   final N namespace = currentNamespace;
>   final StateTable> map = stateTable;
>   List list = map.get(namespace);
>   if (list == null) {
>list = new ArrayList<>();
>map.put(namespace, list);
>   }
>   list.add(value);
>  }
> 就是这个方法,让我产生了 “此value只真实存在一份”的困惑!
> |
> Best,
> Jimmy
> |
> 签名由网易邮箱大师定制
> 在2020年7月11日 21:02,Congxian Qiu 写道:
> Hi
> 你说的 HeapListState 的困惑具体是什么呢?
>
> Best,
> Congxian
>
>
> Jimmy Zhang <13669299...@163.com> 于2020年7月11日周六 下午4:50写道:
>
> 嗯嗯,之前没有选择回复全部,不好意思。
>
>
> 我看源码关于RocksDB这块确实是需要序列化的,所以肯定是多份保存,如果状态后端是heap呢,也是一样的吗?从我测试内存来看,感觉也是多份,只是heapliststate那个类给了我一些困惑
>
>
> 在2020年07月11日 16:23,Congxian Qiu 写道:
> Hi
>
>
> 每个窗口都是一个单独的 state,至于你认为的不同 state 仅保持引用是不对的。这个你可以使用 RocksDBStateBackend
> 来考虑,RocksDBStateBackend 中会把 state 序列化成 bytes,然后写到 RocksDB 中,就是每个 State
> 中都会有一份。
>
>
> PS:回复邮件的时候可以选择「全部回复」这样就能够加上 "user-zh@flink.apache.org"),这样我们的邮件所有人都能看到了
>
>
> Best,
> Congxian
>
>
>
>
> 张浩  于2020年7月7日周二 上午10:34写道:
>
>
>
> Hi,我通过看源码发现每条数据到达时,是分配给了所有的窗口,但是我理解这单条数据是不是只是传递给了每个窗口,其实在内存中只有一份,窗口状态保持对它的引用,触发一次窗口就删掉对这些数据的引用?
> 很高兴与您探讨!
>
>
>
>
> | |
> 张浩
> |
> |
> 邮箱:zhanghao_w...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月06日 20:56,Congxian Qiu 写道:
> Hi
>
>
> 我理解,如果只存取一份的话,state 的管理会变得麻烦一些(所有需要这份数据的窗口都需要去某个地方取, state
> 什么时候清理逻辑也会变得麻烦一些)
>
>
> Best,
> Congxian
>
>
>
>
> 张浩  于2020年7月6日周一 下午1:57写道:
>
> 你好,我的思考是便于在状态信息中清除或者提取每一个窗口的数据信息。
> 不知道,我这样理解的对吗?
> 另外,为什么我们不能只存储一份数据呢?
> 非常感谢与您交流!
>
>
>
>
> | |
> 张浩
> |
> |
> 邮箱:zhanghao_w...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月06日 13:46,Congxian Qiu 写道:
> Hi
> 现在的实现是这样的,每条数据会在每个窗口中存一份
>
> Best,
> Congxian
>
>
> 张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道:
>
> Hi,all!
> 由于第一次咨询,我不确定上一份邮件大家是否收到。
> 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> 份?
>
>
> | |
> 张浩
> |
> |
> 13669299...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-14 文章 Robin Zhang
 
我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
代码如下:
   tEnv.getConfig()
 .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime), 

Time.hours(maxIdleStateRetentionTime));

程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/


[flink-sql] 如何在sql运行时动态修改kafka的scan.startup.mode

2020-07-14 文章 Harold.Miao
hi  all
现在有个需求,就是一段用sql-client提交的sql任务需要动态修改kafka的scan.startup.mode,以支持不同的消费需求。请问有什么好的办法吗?
谢谢


-- 

Best Regards,
Harold Miao


如果解决写hdfs的文件描述符问题?

2020-07-14 文章 李宇彬
hi, everyone



环境信息:flink-1.10.0、hadoop 2.6.0

我从kafka 
topic消费数据,通过BucketSink写到hdfs,在0点的时候会遇到这样的问题,要同时消费当天和昨天的数据,这样在写hdfs时会产生两倍的文件描述符,对hdfs造成很大的压力,请问在不增加文件描述符上限的情况下,如何解决?

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-14 文章 Peihui He
Hi Congxian,

这个错误是从1.9 升级到1.10 遇到的问题。用简单的wordcount 测试,自己根据特定word
抛出runtimeException,就能够重现。flink on yarn 和 flink on k8s 都出现这个问题。1.10
都不能从上次的checkpoint状态中恢复。不知道是不是1.10需要其他配置呢?

Best wishes.

Congxian Qiu  于2020年7月14日周二 下午1:54写道:

> Hi
>
> 这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢?
> 另外你可以看下 tm log 看看有没有其他异常
>
> Best,
> Congxian
>
>
> Yun Tang  于2020年7月14日周二 上午11:57写道:
>
> > Hi Peihui
> >
> >
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> > cause。
> >
> > [1]
> >
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
> >
> >
> > 祝好
> > 唐云
> > 
> > From: Peihui He 
> > Sent: Tuesday, July 14, 2020 10:42
> > To: user-zh@flink.apache.org 
> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >
> > hello,
> >
> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> >
> >
> > Caused by: java.nio.file.NoSuchFileException:
> >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> > ->
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >
> > 配置和1.9.2 一样:
> > state.backend: rocksdb
> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
> > state.backend.incremental: true
> >
> > 代码上都有
> >
> > env.enableCheckpointing(1);
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> >
> >
> >   是1.10.0 需要做什么特别配置么?
> >
>


Re: flink-1.11使用executeSql()执行DDL语句问题

2020-07-14 文章 Benchao Li
看起来是format找不到实现,你可以添加一下flink-json的依赖看一下。

amen...@163.com  于2020年7月14日周二 下午2:38写道:

> hi, everyone
>
> 环境信息:flink-1.11.0, blink-planner, 本地ide开发测试(IDEA)
>
> 问题描述:使用executeSql()方法执行DDL语句,控制台打印如下异常信息。(flink-connector-kafka_2.11依赖已添加)
>
> 我不确定是否还有某个必要的依赖没有添加,还是有其他的地方没有考虑完整,请大佬赐教。
>
>
> --分割线-
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a source for reading table
> 'default_catalog.default_database.kafka_out'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'properties.bootstrap.servers'='localhost:9092'
> 'properties.group.id'='flink-1.11'
> 'scan.startup.mode'='group-offsets'
> 'topic'='flink-kafka'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at example.Example.main(Example.java:77)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factories that implement
> 'org.apache.flink.table.factories.DeserializationFormatFactory' in the
> classpath.
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)
> at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ... 25 more
>
>
> --分割线-
>
> 祝好!
> amenhub
>


-- 

Best,
Benchao Li


Re: flink-1.11使用executeSql()执行DDL语句问题

2020-07-14 文章 Jingsong Li
还要添加flink-json

Best,
Jingsong

On Tue, Jul 14, 2020 at 2:38 PM amen...@163.com  wrote:

> hi, everyone
>
> 环境信息:flink-1.11.0, blink-planner, 本地ide开发测试(IDEA)
>
> 问题描述:使用executeSql()方法执行DDL语句,控制台打印如下异常信息。(flink-connector-kafka_2.11依赖已添加)
>
> 我不确定是否还有某个必要的依赖没有添加,还是有其他的地方没有考虑完整,请大佬赐教。
>
>
> --分割线-
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a source for reading table
> 'default_catalog.default_database.kafka_out'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'properties.bootstrap.servers'='localhost:9092'
> 'properties.group.id'='flink-1.11'
> 'scan.startup.mode'='group-offsets'
> 'topic'='flink-kafka'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at example.Example.main(Example.java:77)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factories that implement
> 'org.apache.flink.table.factories.DeserializationFormatFactory' in the
> classpath.
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)
> at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ... 25 more
>
>
> --分割线-
>
> 祝好!
> amenhub
>


-- 
Best, Jingsong Lee


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-14 文章 Peihui He
Hi Yun,

我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce ->
print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2
里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on
yarn。

Best wishes.

Yun Tang  于2020年7月14日周二 上午11:57写道:

> Hi Peihui
>
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> cause。
>
> [1]
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>
>
> 祝好
> 唐云
> 
> From: Peihui He 
> Sent: Tuesday, July 14, 2020 10:42
> To: user-zh@flink.apache.org 
> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> hello,
>
> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 配置和1.9.2 一样:
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> state.backend.incremental: true
>
> 代码上都有
>
> env.enableCheckpointing(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>
>
>   是1.10.0 需要做什么特别配置么?
>


flink-1.11使用executeSql()执行DDL语句问题

2020-07-14 文章 amen...@163.com
hi, everyone

环境信息:flink-1.11.0, blink-planner, 本地ide开发测试(IDEA)

问题描述:使用executeSql()方法执行DDL语句,控制台打印如下异常信息。(flink-connector-kafka_2.11依赖已添加)

我不确定是否还有某个必要的依赖没有添加,还是有其他的地方没有考虑完整,请大佬赐教。

--分割线-
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a source for reading table 
'default_catalog.default_database.kafka_out'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'properties.group.id'='flink-1.11'
'scan.startup.mode'='group-offsets'
'topic'='flink-kafka'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at example.Example.main(Example.java:77)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factories that implement 
'org.apache.flink.table.factories.DeserializationFormatFactory' in the 
classpath.
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
... 25 more

--分割线-

祝好!
amenhub


Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-14 文章 Robin Zhang
没有窗口,就简单的表join,有kafka流表 ,kudu维表,使用了group by

> Jul 14, 2020; 12:36pm — by zhisheng zhisheng
> 有没有窗口啊?

Robin Zhang <[hidden email]> 于2020年7月14日周二 上午11:48写道:

> 
> 我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
> 代码如下:
>tEnv.getConfig()
>  .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),
>
> Time.hours(maxIdleStateRetentionTime));
>
> 程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink state

2020-07-14 文章 Congxian Qiu
Hi Robert

Boardcast state[1] 是否满足你的需求呢?另外也可以看下这篇文章[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年7月13日周一 下午9:50写道:

> Hello,all
> 目前stream中遇到一个问题,
> 想使用一个全局的state 在所有的keyed stream中使用,或者global
> parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
> operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽
>
>
> Best regards